Permalink
Browse files

Misc code style tweaks (spaces, pep8)

  • Loading branch information...
1 parent ad57e78 commit a2bbccdcc67b7f2208c0193dc7c96bcaa534b8a9 @AntonioChen AntonioChen committed with Feb 12, 2013
Showing with 935 additions and 900 deletions.
  1. +1 −0 .gitignore
  2. +7 −7 doc/api.rst
  3. +0 −1 doc/conf.py
  4. +57 −47 doc/sphinx_local_hooks.py
  5. +68 −65 doc/sphinx_text_to_md.py
  6. +46 −44 setup.py
  7. +396 −390 skydrive/api_v5.py
  8. +245 −234 skydrive/cli_tool.py
  9. +115 −112 skydrive/conf.py
View
1 .gitignore
@@ -6,3 +6,4 @@
/README.txt
*.pyc
*.pyo
+.idea
View
14 doc/api.rst
@@ -1,12 +1,12 @@
.. automodule:: skydrive.api_v5
- :exclude-members: SkyDriveHTTPClient, SkyDriveAuth,
- SkyDriveAPIWrapper, SkyDriveAPI, PersistentSkyDriveAPI
+:exclude-members: SkyDriveHTTPClient, SkyDriveAuth,
+ SkyDriveAPIWrapper, SkyDriveAPI, PersistentSkyDriveAPI
- .. autoclass:: SkyDriveHTTPClient
- .. autoclass:: SkyDriveAuth
- .. autoclass:: SkyDriveAPIWrapper
- .. autoclass:: SkyDriveAPI
+ .. autoclass:: SkyDriveHTTPClient
+.. autoclass:: SkyDriveAuth
+.. autoclass:: SkyDriveAPIWrapper
+.. autoclass:: SkyDriveAPI
.. autoclass:: PersistentSkyDriveAPI
- :exclude-members: rx:^from_conf|conf_(path_default|update_keys)$
+:exclude-members: rx:^from_conf|conf_(path_default|update_keys)$
:inherited-members:
View
1 doc/conf.py
@@ -9,7 +9,6 @@
sys.path.insert(0, abspath('..')) # for module itself
sys.path.append(abspath('.')) # for extenstions
-
needs_sphinx = '1.1'
extensions = ['sphinx.ext.autodoc', 'sphinx_local_hooks']
View
104 doc/sphinx_local_hooks.py
@@ -5,71 +5,81 @@
from collections import Iterable
import os, sys, types, re
-
from sphinx.ext.autodoc import Documenter
_autodoc_add_line = Documenter.add_line
+
@ft.wraps(_autodoc_add_line)
def autodoc_add_line(self, line, *argz, **kwz):
- tee = self.env.app.config.autodoc_dump_rst
- if tee:
- tee_line = self.indent + line
- if isinstance(tee, file): tee.write(tee_line + '\n')
- elif tee is True: print(tee_line)
- else:
- raise ValueError( 'Unrecognized'
- ' value for "autodoc_dump_rst" option: {!r}'.format(tee) )
- return _autodoc_add_line(self, line, *argz, **kwz)
+ tee = self.env.app.config.autodoc_dump_rst
+ if tee:
+ tee_line = self.indent + line
+ if isinstance(tee, file):
+ tee.write(tee_line + '\n')
+ elif tee is True:
+ print(tee_line)
+ else:
+ raise ValueError('Unrecognized'
+ ' value for "autodoc_dump_rst" option: {!r}'.format(tee))
+ return _autodoc_add_line(self, line, *argz, **kwz)
+
Documenter.add_line = autodoc_add_line
def process_docstring(app, what, name, obj, options, lines):
- if not lines: return
+ if not lines: return
- i, ld = 0, dict(enumerate(lines)) # to allow arbitrary peeks
- i_max = max(ld)
+ i, ld = 0, dict(enumerate(lines)) # to allow arbitrary peeks
+ i_max = max(ld)
- def process_line(i):
- line, i_next = ld[i], i + 1
- while i_next not in ld and i_next <= i_max: i_next += 1
- line_next = ld.get(i_next)
+ def process_line(i):
+ line, i_next = ld[i], i + 1
+ while i_next not in ld and i_next <= i_max: i_next += 1
+ line_next = ld.get(i_next)
- if line_next and line_next[0] in u' \t': # tabbed continuation of the sentence
- ld[i] = u'{} {}'.format(line, line_next.strip())
- del ld[i_next]
- process_line(i)
- elif line.endswith(u'.') or (line_next and line_next[0].isupper()): ld[i+0.5] = u''
+ if line_next and line_next[0] in u' \t': # tabbed continuation of the sentence
+ ld[i] = u'{} {}'.format(line, line_next.strip())
+ del ld[i_next]
+ process_line(i)
+ elif line.endswith(u'.') or (line_next and line_next[0].isupper()):
+ ld[i + 0.5] = u''
- for i in xrange(i_max + 1):
- if i not in ld: continue # was removed
- process_line(i)
+ for i in xrange(i_max + 1):
+ if i not in ld: continue # was removed
+ process_line(i)
- # Overwrite the list items inplace, extending the list if necessary
- for i, (k, line) in enumerate(sorted(ld.viewitems())):
- try: lines[i] = line
- except IndexError: lines.append(line)
+ # Overwrite the list items inplace, extending the list if necessary
+ for i, (k, line) in enumerate(sorted(ld.viewitems())):
+ try:
+ lines[i] = line
+ except IndexError:
+ lines.append(line)
def skip_override(app, what, name, obj, skip, options):
- if options.get('exclude-members'):
- include_only = set( re.compile(k[3:])
- for k in options['exclude-members'] if k.startswith('rx:') )
- if include_only:
- for pat in include_only:
- if pat.search(name): break
- else: return True
- if what == 'exception':
- return False if name == '__init__'\
- and isinstance(obj, types.UnboundMethodType) else True
- elif what == 'class':
- if name in ['__init__', '__call__']\
- and isinstance(obj, types.UnboundMethodType): return False
- elif getattr(obj, 'im_class', None) is type: return False
- return skip
+ if options.get('exclude-members'):
+ include_only = set(re.compile(k[3:])
+ for k in options['exclude-members'] if k.startswith('rx:'))
+ if include_only:
+ for pat in include_only:
+ if pat.search(name): break
+ else:
+ return True
+ if what == 'exception':
+ return False if name == '__init__' \
+ and isinstance(obj, types.UnboundMethodType) else True
+ elif what == 'class':
+ if name in ['__init__', '__call__'] \
+ and isinstance(obj, types.UnboundMethodType):
+ return False
+ elif getattr(obj, 'im_class', None) is type:
+ return False
+ return skip
+
def setup(app):
- app.connect('autodoc-process-docstring', process_docstring)
- app.connect('autodoc-skip-member', skip_override)
- app.add_config_value('autodoc_dump_rst', None, True)
+ app.connect('autodoc-process-docstring', process_docstring)
+ app.connect('autodoc-skip-member', skip_override)
+ app.add_config_value('autodoc_dump_rst', None, True)
View
133 doc/sphinx_text_to_md.py
@@ -10,71 +10,74 @@ class FormatError(Exception): pass
def main():
- import argparse
- parser = argparse.ArgumentParser(
- description='Convert sphinx-produced autodoc.apidoc text to markdown.')
- parser.add_argument('src', nargs='?', help='Source file (default: use stdin).')
- optz = parser.parse_args()
-
- src = open(optz.src) if optz.src else sys.stdin
- dst = sys.stdout
-
- py_name = r'[\w_\d]+'
- out = ft.partial(print, file=dst)
-
- st_attrdoc = 0
- st_cont, lse_nl = False, None
-
- for line in src:
- ls = line.strip()
- if not ls: # blank line
- out(line, end='')
- continue
-
- line_indent = re.search(r'^( +)', line)
- if not line_indent: line_indent = 0
- else:
- line_indent = len(line_indent.group(1))
- if line_indent % 3: raise FormatError('Weird indent size: {}'.format(line_indent))
- line_indent = line_indent / 3
-
- lp = line.split()
- lse = re.sub(r'(<\S+) at 0x[\da-f]+(>)', r'\1\2', ls)
- lse = re.sub(r'([_*<>])', r'\\\1', lse)
- for url in re.findall(r'\b\w+://\S+', lse):
- lse = lse.replace(url, url.replace(r'\_', '_'))
- lse = re.sub(r'\bu([\'"])', r'\1', lse)
- st_cont, lse_nl = bool(lse_nl), '' if re.search(r'\b\w+://\S+-$', lse) else '\n'
-
- st_attrdoc_reset = True
- if not line_indent:
- if len(lp) > 2 and lp[0] == lp[1]:
- if lp[0] in ('exception', 'class'): # class, exception
- out('\n'*1, end='')
- out('* **{}**'.format(' '.join(lse.split()[1:])))
-
- else:
- raise FormatError('Unhandled: {!r}'.format(line))
-
- elif line_indent == 1:
- if re.search(r'^(\w+ )?{}\('.format(py_name), ls): # function
- out('\n'*1, end='')
- out('{}* {}'.format(' '*4, lse))
- st_attrdoc, st_attrdoc_reset = 8, False
- elif re.search(r'^{}\s+=\s+'.format(py_name), ls): # attribute
- out('{}* {}'.format(' '*4, lse))
- st_attrdoc, st_attrdoc_reset = 8, False
- elif lp[0] == 'Bases:': # class bases
- out('{}{}'.format(' '*4, lse))
- st_attrdoc, st_attrdoc_reset = 4, False
- else: out('{}{}'.format(' '*(4 * st_cont), ls), end=lse_nl) # class docstring
-
- else: # description line
- if ls[0] in '-*': line = '\\' + line.lstrip()
- out('{}{}'.format(' '*(st_attrdoc * st_cont), line.strip()), end=lse_nl)
- st_attrdoc_reset = False
-
- if st_attrdoc and st_attrdoc_reset: st_attrdoc = 0
+ import argparse
+
+ parser = argparse.ArgumentParser(
+ description='Convert sphinx-produced autodoc.apidoc text to markdown.')
+ parser.add_argument('src', nargs='?', help='Source file (default: use stdin).')
+ optz = parser.parse_args()
+
+ src = open(optz.src) if optz.src else sys.stdin
+ dst = sys.stdout
+
+ py_name = r'[\w_\d]+'
+ out = ft.partial(print, file=dst)
+
+ st_attrdoc = 0
+ st_cont, lse_nl = False, None
+
+ for line in src:
+ ls = line.strip()
+ if not ls: # blank line
+ out(line, end='')
+ continue
+
+ line_indent = re.search(r'^( +)', line)
+ if not line_indent:
+ line_indent = 0
+ else:
+ line_indent = len(line_indent.group(1))
+ if line_indent % 3: raise FormatError('Weird indent size: {}'.format(line_indent))
+ line_indent = line_indent / 3
+
+ lp = line.split()
+ lse = re.sub(r'(<\S+) at 0x[\da-f]+(>)', r'\1\2', ls)
+ lse = re.sub(r'([_*<>])', r'\\\1', lse)
+ for url in re.findall(r'\b\w+://\S+', lse):
+ lse = lse.replace(url, url.replace(r'\_', '_'))
+ lse = re.sub(r'\bu([\'"])', r'\1', lse)
+ st_cont, lse_nl = bool(lse_nl), '' if re.search(r'\b\w+://\S+-$', lse) else '\n'
+
+ st_attrdoc_reset = True
+ if not line_indent:
+ if len(lp) > 2 and lp[0] == lp[1]:
+ if lp[0] in ('exception', 'class'): # class, exception
+ out('\n' * 1, end='')
+ out('* **{}**'.format(' '.join(lse.split()[1:])))
+
+ else:
+ raise FormatError('Unhandled: {!r}'.format(line))
+
+ elif line_indent == 1:
+ if re.search(r'^(\w+ )?{}\('.format(py_name), ls): # function
+ out('\n' * 1, end='')
+ out('{}* {}'.format(' ' * 4, lse))
+ st_attrdoc, st_attrdoc_reset = 8, False
+ elif re.search(r'^{}\s+=\s+'.format(py_name), ls): # attribute
+ out('{}* {}'.format(' ' * 4, lse))
+ st_attrdoc, st_attrdoc_reset = 8, False
+ elif lp[0] == 'Bases:': # class bases
+ out('{}{}'.format(' ' * 4, lse))
+ st_attrdoc, st_attrdoc_reset = 4, False
+ else:
+ out('{}{}'.format(' ' * (4 * st_cont), ls), end=lse_nl) # class docstring
+
+ else: # description line
+ if ls[0] in '-*': line = '\\' + line.lstrip()
+ out('{}{}'.format(' ' * (st_attrdoc * st_cont), line.strip()), end=lse_nl)
+ st_attrdoc_reset = False
+
+ if st_attrdoc and st_attrdoc_reset: st_attrdoc = 0
if __name__ == '__main__': main()
View
90 setup.py
@@ -6,50 +6,52 @@
pkg_root = os.path.dirname(__file__)
# Error-handling here is to allow package to be built w/o README included
-try: readme = open(os.path.join(pkg_root, 'README.txt')).read()
-except IOError: readme = ''
+try:
+ readme = open(os.path.join(pkg_root, 'README.txt')).read()
+except IOError:
+ readme = ''
setup(
- name = 'python-skydrive',
- version = '13.01.2',
- author = 'Mike Kazantsev',
- author_email = 'mk.fraggod@gmail.com',
- license = 'WTFPL',
- keywords = 'skydrive api oauth2 rest microsoft cloud live liveconnect',
- url = 'http://github.com/mk-fg/python-skydrive',
-
- description = 'Python and command-line interface'
- ' for Microsoft LiveConnect SkyDrive REST API v5.0',
- long_description = readme,
-
- classifiers = [
- 'Development Status :: 4 - Beta',
- 'Environment :: Console',
- 'Intended Audience :: Developers',
- 'Intended Audience :: System Administrators',
- 'Intended Audience :: Information Technology',
- 'License :: OSI Approved',
- 'Operating System :: OS Independent',
- 'Programming Language :: Python',
- 'Programming Language :: Python :: 2.7',
- 'Programming Language :: Python :: 2 :: Only',
- 'Topic :: Internet',
- 'Topic :: Software Development',
- 'Topic :: System :: Archiving',
- 'Topic :: System :: Filesystems',
- 'Topic :: Utilities' ],
-
- # install_requires = [],
- extras_require = dict(
- standalone=['requests'],
- cli=['PyYAML', 'requests'],
- conf=['PyYAML', 'requests'] ),
-
- packages = find_packages(),
- include_package_data = True,
- package_data = {'': ['README.txt']},
- exclude_package_data = {'': ['README.*']},
-
- entry_points = dict(console_scripts=[
- 'skydrive-cli = skydrive.cli_tool:main' ]) )
+ name='python-skydrive',
+ version='13.03.3',
+ author='Mike Kazantsev',
+ author_email='mk.fraggod@gmail.com',
+ license='WTFPL',
+ keywords='skydrive api oauth2 rest microsoft cloud live liveconnect',
+ url='http://github.com/mk-fg/python-skydrive',
+
+ description='Python and command-line interface'
+ ' for Microsoft LiveConnect SkyDrive REST API v5.0',
+ long_description=readme,
+
+ classifiers=[
+ 'Development Status :: 4 - Beta',
+ 'Environment :: Console',
+ 'Intended Audience :: Developers',
+ 'Intended Audience :: System Administrators',
+ 'Intended Audience :: Information Technology',
+ 'License :: OSI Approved',
+ 'Operating System :: OS Independent',
+ 'Programming Language :: Python',
+ 'Programming Language :: Python :: 2.7',
+ 'Programming Language :: Python :: 2 :: Only',
+ 'Topic :: Internet',
+ 'Topic :: Software Development',
+ 'Topic :: System :: Archiving',
+ 'Topic :: System :: Filesystems',
+ 'Topic :: Utilities'],
+
+ # install_requires = [],
+ extras_require=dict(
+ standalone=['requests'],
+ cli=['PyYAML', 'requests'],
+ conf=['PyYAML', 'requests']),
+
+ packages=find_packages(),
+ include_package_data=True,
+ package_data={'': ['README.txt']},
+ exclude_package_data={'': ['README.*']},
+
+ entry_points=dict(console_scripts=[
+ 'skydrive-cli = skydrive.cli_tool:main']))
View
786 skydrive/api_v5.py
@@ -1,7 +1,6 @@
#-*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function
-
import itertools as it, operator as op, functools as ft
from datetime import datetime, timedelta
from os.path import join, basename
@@ -10,419 +9,426 @@
from .conf import ConfigMixin
import logging
-log = logging.getLogger(__name__)
+log = logging.getLogger(__name__)
class SkyDriveInteractionError(Exception): pass
+
class ProtocolError(SkyDriveInteractionError):
- def __init__(self, code, msg):
- super(ProtocolError, self).__init__(code, msg)
- self.code = code
+ def __init__(self, code, msg):
+ super(ProtocolError, self).__init__(code, msg)
+ self.code = code
+
class AuthenticationError(SkyDriveInteractionError): pass
-class DoesNotExists(SkyDriveInteractionError):
- 'Only raised from SkyDriveAPI.resolve_path().'
+class DoesNotExists(SkyDriveInteractionError):
+ 'Only raised from SkyDriveAPI.resolve_path().'
class SkyDriveHTTPClient(object):
-
- def _requests_tls_workarounds(self, requests):
- # Workaround for TLSv1.2 issue with Microsoft livefilestore.com hosts.
- session = None
-
- if requests.__version__ in ['0.14.1', '0.14.2']:
- # These versions can only be monkey-patched, unfortunately.
- # See README and following related links for details:
- # https://github.com/mk-fg/python-skydrive/issues/1
- # https://github.com/kennethreitz/requests/pull/799
- # https://github.com/kennethreitz/requests/pull/900
- # https://github.com/kennethreitz/requests/issues/1083
- # https://github.com/shazow/urllib3/pull/109
- from requests.packages.urllib3 import connectionpool as cp
- socket, ssl, match_hostname = cp.socket, cp.ssl, cp.match_hostname
- class VerifiedHTTPSConnection(cp.VerifiedHTTPSConnection):
- def connect(self):
- sock = socket.create_connection((self.host, self.port), self.timeout)
- self.sock = ssl.wrap_socket( sock, self.key_file,
- self.cert_file, cert_reqs=self.cert_reqs, ca_certs=self.ca_certs,
- ssl_version=ssl.PROTOCOL_TLSv1 )
- if self.ca_certs: match_hostname(self.sock.getpeercert(), self.host)
- cp.VerifiedHTTPSConnection = VerifiedHTTPSConnection
-
- else:
- version = tuple(it.imap(int, requests.__version__.split('.')))
- if version > (1, 0, 0):
- # No hacks necessary - session HTTPAdapter can be used!
- from requests.packages.urllib3.poolmanager import PoolManager
- from requests.adapters import HTTPAdapter
- import ssl
- class TLSv1Adapter(HTTPAdapter):
- def init_poolmanager(self, connections, maxsize):
- self.poolmanager = PoolManager(
- num_pools=connections, maxsize=maxsize,
- ssl_version=ssl.PROTOCOL_TLSv1 )
- session = requests.Session()
- session.mount('https://', TLSv1Adapter())
-
- requests._skydrive_tls_fixed = True
- return session
-
- def request( self, url, method='get', data=None,
- files=None, raw=False, headers=dict(), raise_for=dict(),
- session=None ):
- '''Make synchronous HTTP request.
- Can be overidden to use different http module (e.g. urllib2, twisted, etc).'''
- import requests # import here to avoid dependency on the module
- if not getattr(requests, '_skydrive_tls_fixed', False):
- # temporary fix for https://github.com/mk-fg/python-skydrive/issues/1
- patched_session = self._requests_tls_workarounds(requests)
- if patched_session is not None:
- self._requests_session = patched_session
-
- if session is None:
- try: session = self._requests_session
- except AttributeError:
- session = self._requests_session = requests.session()
- elif not session: session = requests
-
- method = method.lower()
- kwz, func = dict(), getattr( session, method,
- ft.partial(session.request, method.upper()) )
- if data is not None:
- if method == 'post': kwz['data'] = data
- else:
- kwz['data'] = json.dumps(data)
- headers = headers.copy()
- headers.setdefault('Content-Type', 'application/json')
- if files is not None: kwz['files'] = files
- if headers is not None: kwz['headers'] = headers
- code = None
- try:
- res = func(url, **kwz)
- # log.debug('Response headers: {}'.format(res.headers))
- code = res.status_code
- if code == requests.codes.no_content: return
- if code != requests.codes.ok: res.raise_for_status()
- return json.loads(res.text) if not raw else res.content
- except requests.RequestException as err:
- raise raise_for.get(code, ProtocolError)(code, err.message)
-
+ def _requests_tls_workarounds(self, requests):
+ # Workaround for TLSv1.2 issue with Microsoft livefilestore.com hosts.
+ session = None
+
+ if requests.__version__ in ['0.14.1', '0.14.2']:
+ # These versions can only be monkey-patched, unfortunately.
+ # See README and following related links for details:
+ # https://github.com/mk-fg/python-skydrive/issues/1
+ # https://github.com/kennethreitz/requests/pull/799
+ # https://github.com/kennethreitz/requests/pull/900
+ # https://github.com/kennethreitz/requests/issues/1083
+ # https://github.com/shazow/urllib3/pull/109
+ from requests.packages.urllib3 import connectionpool as cp
+
+ socket, ssl, match_hostname = cp.socket, cp.ssl, cp.match_hostname
+
+ class VerifiedHTTPSConnection(cp.VerifiedHTTPSConnection):
+ def connect(self):
+ sock = socket.create_connection((self.host, self.port), self.timeout)
+ self.sock = ssl.wrap_socket(sock, self.key_file,
+ self.cert_file, cert_reqs=self.cert_reqs, ca_certs=self.ca_certs,
+ ssl_version=ssl.PROTOCOL_TLSv1)
+ if self.ca_certs: match_hostname(self.sock.getpeercert(), self.host)
+
+ cp.VerifiedHTTPSConnection = VerifiedHTTPSConnection
+
+ else:
+ version = tuple(it.imap(int, requests.__version__.split('.')))
+ if version > (1, 0, 0):
+ # No hacks necessary - session HTTPAdapter can be used!
+ from requests.packages.urllib3.poolmanager import PoolManager
+ from requests.adapters import HTTPAdapter
+ import ssl
+
+ class TLSv1Adapter(HTTPAdapter):
+ def init_poolmanager(self, connections, maxsize):
+ self.poolmanager = PoolManager(
+ num_pools=connections, maxsize=maxsize,
+ ssl_version=ssl.PROTOCOL_TLSv1)
+
+ session = requests.Session()
+ session.mount('https://', TLSv1Adapter())
+
+ requests._skydrive_tls_fixed = True
+ return session
+
+ def request( self, url, method='get', data=None,
+ files=None, raw=False, headers=dict(), raise_for=dict(),
+ session=None ):
+ '''Make synchronous HTTP request.
+ Can be overidden to use different http module (e.g. urllib2, twisted, etc).'''
+ import requests # import here to avoid dependency on the module
+
+ if not getattr(requests, '_skydrive_tls_fixed', False):
+ # temporary fix for https://github.com/mk-fg/python-skydrive/issues/1
+ patched_session = self._requests_tls_workarounds(requests)
+ if patched_session is not None:
+ self._requests_session = patched_session
+
+ if session is None:
+ try:
+ session = self._requests_session
+ except AttributeError:
+ session = self._requests_session = requests.session()
+ elif not session:
+ session = requests
+
+ method = method.lower()
+ kwz, func = dict(), getattr(session, method,
+ ft.partial(session.request, method.upper()))
+ if data is not None:
+ if method == 'post':
+ kwz['data'] = data
+ else:
+ kwz['data'] = json.dumps(data)
+ headers = headers.copy()
+ headers.setdefault('Content-Type', 'application/json')
+ if files is not None: kwz['files'] = files
+ if headers is not None: kwz['headers'] = headers
+ code = None
+ try:
+ res = func(url, **kwz)
+ # log.debug('Response headers: {}'.format(res.headers))
+ code = res.status_code
+ if code == requests.codes.no_content: return
+ if code != requests.codes.ok: res.raise_for_status()
+ return json.loads(res.text) if not raw else res.content
+ except requests.RequestException as err:
+ raise raise_for.get(code, ProtocolError)(code, err.message)
class SkyDriveAuth(SkyDriveHTTPClient):
-
- #: Client id/secret should be static on per-application basis.
- #: Can be received from LiveConnect by any registered user at https://manage.dev.live.com/
- #: API ToS can be found at http://msdn.microsoft.com/en-US/library/live/ff765012
- client_id = client_secret = None
-
- auth_url_user = 'https://login.live.com/oauth20_authorize.srf'
- auth_url_token = 'https://login.live.com/oauth20_token.srf'
- auth_scope = 'wl.skydrive', 'wl.skydrive_update', 'wl.offline_access'
- auth_redirect_uri_mobile = 'https://login.live.com/oauth20_desktop.srf'
-
- #: Set by auth_get_token() method, not used internally.
- #: Might be useful for debugging or extension purposes.
- auth_access_expires = auth_access_data_raw = None
-
- #: At least one of auth_code, auth_refresh_token or
- #: auth_access_token should be set before data requests.
- auth_code = auth_refresh_token = auth_access_token = None
-
- #: This (default) redirect_uri is **special** - app must be marked as "mobile" to use it.
- auth_redirect_uri = auth_redirect_uri_mobile
-
-
- def __init__(self, **config):
- 'Initialize API wrapper class with specified properties set.'
- for k, v in config.viewitems():
- try: getattr(self, k)
- except AttributeError:
- raise AttributeError('Unrecognized configuration key: {}'.format(k))
- setattr(self, k, v)
-
-
- def auth_user_get_url(self, scope=None):
- 'Build authorization URL for User Agent.'
- # Note: default redirect_uri is **special**, app must be marked as "mobile" to use it
- if not self.client_id: raise AuthenticationError('No client_id specified')
- return '{}?{}'.format( self.auth_url_user, urllib.urlencode(dict(
- client_id=self.client_id, scope=' '.join(scope or self.auth_scope),
- response_type='code', redirect_uri=self.auth_redirect_uri )) )
-
- def auth_user_process_url(self, url):
- 'Process tokens and errors from redirect_uri.'
- url = urlparse.urlparse(url)
- url_qs = dict(it.chain.from_iterable(
- urlparse.parse_qsl(v) for v in [url.query, url.fragment] ))
- if url_qs.get('error'):
- raise AuthenticationError('{} :: {}'.format(
- url_qs['error'], url_qs.get('error_description') ))
- self.auth_code = url_qs['code']
- return self.auth_code
-
-
- def auth_get_token(self, check_scope=True):
- 'Refresh or acquire access_token.'
- res = self.auth_access_data_raw = self._auth_token_request()
- return self._auth_token_process(res, check_scope=check_scope)
-
- def _auth_token_request(self):
- post_data = dict( client_id=self.client_id,
- client_secret=self.client_secret, redirect_uri=self.auth_redirect_uri )
- if not self.auth_refresh_token:
- log.debug('Requesting new access_token through authorization_code grant')
- post_data.update(code=self.auth_code, grant_type='authorization_code')
- else:
- if self.auth_redirect_uri == self.auth_redirect_uri_mobile:
- del post_data['client_secret'] # not necessary for "mobile" apps
- log.debug('Refreshing access_token')
- post_data.update(
- refresh_token=self.auth_refresh_token, grant_type='refresh_token' )
- post_data_missing_keys = list( k for k in
- ['client_id', 'client_secret', 'code', 'refresh_token', 'grant_type']
- if k in post_data and not post_data[k] )
- if post_data_missing_keys:
- raise AuthenticationError( 'Insufficient authentication'
- ' data provided (missing keys: {})'.format(post_data_missing_keys) )
-
- return self.request(self.auth_url_token, method='post', data=post_data)
-
- def _auth_token_process(self, res, check_scope=True):
- assert res['token_type'] == 'bearer'
- for k in 'access_token', 'refresh_token':
- if k in res: setattr(self, 'auth_{}'.format(k), res[k])
- self.auth_access_expires = None if 'expires_in' not in res\
- else (datetime.utcnow() + timedelta(0, res['expires_in']))
-
- scope_granted = res.get('scope', '').split()
- if check_scope and set(self.auth_scope) != set(scope_granted):
- raise AuthenticationError(
- "Granted scope ({}) doesn't match requested one ({})."\
- .format(', '.join(scope_granted), ', '.join(self.auth_scope)) )
- return scope_granted
-
+ #: Client id/secret should be static on per-application basis.
+ #: Can be received from LiveConnect by any registered user at https://manage.dev.live.com/
+ #: API ToS can be found at http://msdn.microsoft.com/en-US/library/live/ff765012
+ client_id = client_secret = None
+
+ auth_url_user = 'https://login.live.com/oauth20_authorize.srf'
+ auth_url_token = 'https://login.live.com/oauth20_token.srf'
+ auth_scope = 'wl.skydrive', 'wl.skydrive_update', 'wl.offline_access'
+ auth_redirect_uri_mobile = 'https://login.live.com/oauth20_desktop.srf'
+
+ #: Set by auth_get_token() method, not used internally.
+ #: Might be useful for debugging or extension purposes.
+ auth_access_expires = auth_access_data_raw = None
+
+ #: At least one of auth_code, auth_refresh_token or
+ #: auth_access_token should be set before data requests.
+ auth_code = auth_refresh_token = auth_access_token = None
+
+ #: This (default) redirect_uri is **special** - app must be marked as "mobile" to use it.
+ auth_redirect_uri = auth_redirect_uri_mobile
+
+
+ def __init__(self, **config):
+ 'Initialize API wrapper class with specified properties set.'
+ for k, v in config.viewitems():
+ try:
+ getattr(self, k)
+ except AttributeError:
+ raise AttributeError('Unrecognized configuration key: {}'.format(k))
+ setattr(self, k, v)
+
+
+ def auth_user_get_url(self, scope=None):
+ 'Build authorization URL for User Agent.'
+ # Note: default redirect_uri is **special**, app must be marked as "mobile" to use it
+ if not self.client_id: raise AuthenticationError('No client_id specified')
+ return '{}?{}'.format(self.auth_url_user, urllib.urlencode(dict(
+ client_id=self.client_id, scope=' '.join(scope or self.auth_scope),
+ response_type='code', redirect_uri=self.auth_redirect_uri)))
+
+ def auth_user_process_url(self, url):
+ 'Process tokens and errors from redirect_uri.'
+ url = urlparse.urlparse(url)
+ url_qs = dict(it.chain.from_iterable(
+ urlparse.parse_qsl(v) for v in [url.query, url.fragment]))
+ if url_qs.get('error'):
+ raise AuthenticationError('{} :: {}'.format(
+ url_qs['error'], url_qs.get('error_description')))
+ self.auth_code = url_qs['code']
+ return self.auth_code
+
+
+ def auth_get_token(self, check_scope=True):
+ 'Refresh or acquire access_token.'
+ res = self.auth_access_data_raw = self._auth_token_request()
+ return self._auth_token_process(res, check_scope=check_scope)
+
+ def _auth_token_request(self):
+ post_data = dict(client_id=self.client_id,
+ client_secret=self.client_secret, redirect_uri=self.auth_redirect_uri)
+ if not self.auth_refresh_token:
+ log.debug('Requesting new access_token through authorization_code grant')
+ post_data.update(code=self.auth_code, grant_type='authorization_code')
+ else:
+ if self.auth_redirect_uri == self.auth_redirect_uri_mobile:
+ del post_data['client_secret'] # not necessary for "mobile" apps
+ log.debug('Refreshing access_token')
+ post_data.update(
+ refresh_token=self.auth_refresh_token, grant_type='refresh_token')
+ post_data_missing_keys = list(k for k in
+ ['client_id', 'client_secret', 'code', 'refresh_token', 'grant_type']
+ if k in post_data and not post_data[k])
+ if post_data_missing_keys:
+ raise AuthenticationError('Insufficient authentication'
+ ' data provided (missing keys: {})'.format(post_data_missing_keys))
+
+ return self.request(self.auth_url_token, method='post', data=post_data)
+
+ def _auth_token_process(self, res, check_scope=True):
+ assert res['token_type'] == 'bearer'
+ for k in 'access_token', 'refresh_token':
+ if k in res: setattr(self, 'auth_{}'.format(k), res[k])
+ self.auth_access_expires = None if 'expires_in' not in res \
+ else (datetime.utcnow() + timedelta(0, res['expires_in']))
+
+ scope_granted = res.get('scope', '').split()
+ if check_scope and set(self.auth_scope) != set(scope_granted):
+ raise AuthenticationError(
+ "Granted scope ({}) doesn't match requested one ({})." \
+ .format(', '.join(scope_granted), ', '.join(self.auth_scope)))
+ return scope_granted
class SkyDriveAPIWrapper(SkyDriveAuth):
-
- '''Less-biased SkyDrive API wrapper class.
- All calls made here return result of self.request() call directly,
- so it can easily be made async (e.g. return twisted deferred object)
- by overriding http request method in subclass.'''
-
- api_url_base = 'https://apis.live.net/v5.0/'
-
- def _api_url( self, path, query=dict(),
- pass_access_token=True, pass_empty_values=False ):
- query = query.copy()
- if pass_access_token:
- query.setdefault('access_token', self.auth_access_token)
- if not pass_empty_values:
- for k,v in query.viewitems():
- if not v:
- raise AuthenticationError('Empty key {!r} for API call (path: {})'.format(k, path))
- return urlparse.urljoin( self.api_url_base,
- '{}?{}'.format(path, urllib.urlencode(query)) )
-
- def __call__( self, url='me/skydrive', query=dict(),
- query_filter=True, auth_header=False,
- auto_refresh_token=True, **request_kwz ):
- '''Make an arbitrary call to LiveConnect API.
- Shouldn't be used directly under most circumstances.'''
- if query_filter:
- query = dict( (k,v) for k,v in
- query.viewitems() if v is not None )
- if auth_header:
- request_kwz.setdefault('headers', dict())\
- ['Authorization'] = 'Bearer {}'.format(self.auth_access_token)
- kwz = request_kwz.copy()
- kwz.setdefault('raise_for', dict())[401] = AuthenticationError
- api_url = ft.partial( self._api_url,
- url, query, pass_access_token=not auth_header )
- try: return self.request(api_url(), **kwz)
- except AuthenticationError:
- if not auto_refresh_token: raise
- self.auth_get_token()
- if auth_header: # update auth header with a new token
- request_kwz['headers']['Authorization']\
- = 'Bearer {}'.format(self.auth_access_token)
- return self.request(api_url(), **request_kwz)
-
-
- def get_quota(self):
- 'Get SkyDrive object, representing quota.'
- return self('me/skydrive/quota')
-
- def listdir(self, folder_id='me/skydrive', limit=None):
- 'Get SkyDrive object, representing list of objects in a folder.'
- return self(join(folder_id, 'files'), dict(limit=limit))
-
- def info(self, obj_id='me/skydrive'):
- '''Return metadata of a specified object.
- See http://msdn.microsoft.com/en-us/library/live/hh243648.aspx
- for the list and description of metadata keys for each object type.'''
- return self(obj_id)
-
-
- def get(self, obj_id, byte_range=None):
- '''Download and return an file (object) or a specified byte_range from it.
- See HTTP Range header (rfc2616) for possible byte_range formats,
- some examples: "0-499" - byte offsets 0-499 (inclusive), "-500" - final 500 bytes.'''
- kwz = dict()
- if byte_range:
- kwz['headers'] = dict(Range='bytes={}'.format(byte_range))
- return self(join(obj_id, 'content'), dict(download='true'), raw=True, **kwz)
-
- def put(self, path_or_tuple, folder_id='me/skydrive', overwrite=True):
- '''Upload a file (object), possibly overwriting
- (default behavior) a file with the same "name" attribute, if exists.
- First argument can be either path to a local file or tuple of "(name, file)",
- where "file" can be either a file-like object or just a string of bytes.
- overwrite option can be set to False to allow two identically-named
- files or "ChooseNewName" to let SkyDrive derive some similar unique name.
- Behavior of this option mimics underlying API.'''
- if overwrite is not None:
- if overwrite is False: overwrite = 'false'
- elif overwrite in ('true', True): overwrite = None # don't pass it
- elif overwrite != 'ChooseNewName':
- raise ValueError( 'overwrite parameter'
- ' must be True, False or "ChooseNewName".' )
- name, src = (basename(path_or_tuple), open(path_or_tuple))\
- if isinstance(path_or_tuple, types.StringTypes)\
- else (path_or_tuple[0], path_or_tuple[1])
- return self( join(folder_id, 'files'),
- dict(overwrite=overwrite), method='post', files=dict(file=(name, src)) )
-
- def mkdir(self, name=None, folder_id='me/skydrive', metadata=dict()):
- '''Create a folder with a specified "name" attribute.
- folder_id allows to specify a parent folder.
- metadata mapping may contain additional folder properties to pass to an API.'''
- metadata = metadata.copy()
- if name: metadata['name'] = name
- return self(folder_id, data=metadata, method='post', auth_header=True)
-
- def delete(self, obj_id):
- 'Delete specified object.'
- return self(obj_id, method='delete')
-
-
- def info_update(self, obj_id, data):
- '''Update metadata with of a specified object.
- See http://msdn.microsoft.com/en-us/library/live/hh243648.aspx
- for the list of RW keys for each object type.'''
- return self(obj_id, method='put', data=data, auth_header=True)
-
- def link(self, obj_id, link_type='shared_read_link'):
- '''Return a preauthenticated (useable by anyone) link to a specified object.
- Object will be considered "shared" by SkyDrive, even if link is never actually used.
- link_type can be either "embed" (returns html), "shared_read_link" or "shared_edit_link".'''
- assert link_type in ['embed', 'shared_read_link', 'shared_edit_link']
- return self(join(obj_id, link_type), method='get')
-
-
- def copy(self, obj_id, folder_id, move=False):
- '''Copy specified file (object) to a folder with a given ID.
- Well-known folder names (like "me/skydrive") don't seem to work here.
- Folders cannot be copied, this is API limitation.'''
- return self( obj_id,
- method='copy' if not move else 'move',
- data=dict(destination=folder_id), auth_header=True )
-
- def move(self, obj_id, folder_id):
- '''Move specified file (object) to a folder.
- Note that folders cannot be moved, this is API limitation.'''
- return self.copy(obj_id, folder_id, move=True)
-
-
- def comments(self, obj_id):
- 'Get SkyDrive object, representing a list of comments for an object.'
- return self(join(obj_id, 'comments'))
-
- def comment_add(self, obj_id, message):
- 'Add comment message to a specified object.'
- return self( join(obj_id, 'comments'), method='post',
- data=dict(message=message), auth_header=True )
-
- def comment_delete(self, comment_id):
- '''Delete specified comment.
- comment_id can be acquired by listing comments for an object.'''
- return self(comment_id, method='delete')
-
+ '''Less-biased SkyDrive API wrapper class.
+ All calls made here return result of self.request() call directly,
+ so it can easily be made async (e.g. return twisted deferred object)
+ by overriding http request method in subclass.'''
+
+ api_url_base = 'https://apis.live.net/v5.0/'
+
+ def _api_url( self, path, query=dict(),
+ pass_access_token=True, pass_empty_values=False ):
+ query = query.copy()
+ if pass_access_token:
+ query.setdefault('access_token', self.auth_access_token)
+ if not pass_empty_values:
+ for k, v in query.viewitems():
+ if not v:
+ raise AuthenticationError('Empty key {!r} for API call (path: {})'.format(k, path))
+ return urlparse.urljoin(self.api_url_base,
+ '{}?{}'.format(path, urllib.urlencode(query)))
+
+ def __call__( self, url='me/skydrive', query=dict(),
+ query_filter=True, auth_header=False,
+ auto_refresh_token=True, **request_kwz ):
+ '''Make an arbitrary call to LiveConnect API.
+ Shouldn't be used directly under most circumstances.'''
+ if query_filter:
+ query = dict((k, v) for k, v in
+ query.viewitems() if v is not None)
+ if auth_header:
+ request_kwz.setdefault('headers', dict()) \
+ ['Authorization'] = 'Bearer {}'.format(self.auth_access_token)
+ kwz = request_kwz.copy()
+ kwz.setdefault('raise_for', dict())[401] = AuthenticationError
+ api_url = ft.partial(self._api_url,
+ url, query, pass_access_token=not auth_header)
+ try:
+ return self.request(api_url(), **kwz)
+ except AuthenticationError:
+ if not auto_refresh_token: raise
+ self.auth_get_token()
+ if auth_header: # update auth header with a new token
+ request_kwz['headers']['Authorization'] \
+ = 'Bearer {}'.format(self.auth_access_token)
+ return self.request(api_url(), **request_kwz)
+
+
+ def get_quota(self):
+ 'Get SkyDrive object, representing quota.'
+ return self('me/skydrive/quota')
+
+ def listdir(self, folder_id='me/skydrive', limit=None):
+ 'Get SkyDrive object, representing list of objects in a folder.'
+ return self(join(folder_id, 'files'), dict(limit=limit))
+
+ def info(self, obj_id='me/skydrive'):
+ '''Return metadata of a specified object.
+ See http://msdn.microsoft.com/en-us/library/live/hh243648.aspx
+ for the list and description of metadata keys for each object type.'''
+ return self(obj_id)
+
+
+ def get(self, obj_id, byte_range=None):
+ '''Download and return an file (object) or a specified byte_range from it.
+ See HTTP Range header (rfc2616) for possible byte_range formats,
+ some examples: "0-499" - byte offsets 0-499 (inclusive), "-500" - final 500 bytes.'''
+ kwz = dict()
+ if byte_range:
+ kwz['headers'] = dict(Range='bytes={}'.format(byte_range))
+ return self(join(obj_id, 'content'), dict(download='true'), raw=True, **kwz)
+
+ def put(self, path_or_tuple, folder_id='me/skydrive', overwrite=True):
+ '''Upload a file (object), possibly overwriting
+ (default behavior) a file with the same "name" attribute, if exists.
+ First argument can be either path to a local file or tuple of "(name, file)",
+ where "file" can be either a file-like object or just a string of bytes.
+ overwrite option can be set to False to allow two identically-named
+ files or "ChooseNewName" to let SkyDrive derive some similar unique name.
+ Behavior of this option mimics underlying API.'''
+ if overwrite is not None:
+ if overwrite is False:
+ overwrite = 'false'
+ elif overwrite in ('true', True):
+ overwrite = None # don't pass it
+ elif overwrite != 'ChooseNewName':
+ raise ValueError('overwrite parameter'
+ ' must be True, False or "ChooseNewName".')
+ name, src = (basename(path_or_tuple), open(path_or_tuple)) \
+ if isinstance(path_or_tuple, types.StringTypes) \
+ else (path_or_tuple[0], path_or_tuple[1])
+ return self(join(folder_id, 'files'),
+ dict(overwrite=overwrite), method='post', files=dict(file=(name, src)))
+
+ def mkdir(self, name=None, folder_id='me/skydrive', metadata=dict()):
+ '''Create a folder with a specified "name" attribute.
+ folder_id allows to specify a parent folder.
+ metadata mapping may contain additional folder properties to pass to an API.'''
+ metadata = metadata.copy()
+ if name: metadata['name'] = name
+ return self(folder_id, data=metadata, method='post', auth_header=True)
+
+ def delete(self, obj_id):
+ 'Delete specified object.'
+ return self(obj_id, method='delete')
+
+
+ def info_update(self, obj_id, data):
+ '''Update metadata with of a specified object.
+ See http://msdn.microsoft.com/en-us/library/live/hh243648.aspx
+ for the list of RW keys for each object type.'''
+ return self(obj_id, method='put', data=data, auth_header=True)
+
+ def link(self, obj_id, link_type='shared_read_link'):
+ '''Return a preauthenticated (useable by anyone) link to a specified object.
+ Object will be considered "shared" by SkyDrive, even if link is never actually used.
+ link_type can be either "embed" (returns html), "shared_read_link" or "shared_edit_link".'''
+ assert link_type in ['embed', 'shared_read_link', 'shared_edit_link']
+ return self(join(obj_id, link_type), method='get')
+
+
+ def copy(self, obj_id, folder_id, move=False):
+ '''Copy specified file (object) to a folder with a given ID.
+ Well-known folder names (like "me/skydrive") don't seem to work here.
+ Folders cannot be copied, this is API limitation.'''
+ return self(obj_id,
+ method='copy' if not move else 'move',
+ data=dict(destination=folder_id), auth_header=True)
+
+ def move(self, obj_id, folder_id):
+ '''Move specified file (object) to a folder.
+ Note that folders cannot be moved, this is API limitation.'''
+ return self.copy(obj_id, folder_id, move=True)
+
+
+ def comments(self, obj_id):
+ 'Get SkyDrive object, representing a list of comments for an object.'
+ return self(join(obj_id, 'comments'))
+
+ def comment_add(self, obj_id, message):
+ 'Add comment message to a specified object.'
+ return self(join(obj_id, 'comments'), method='post',
+ data=dict(message=message), auth_header=True)
+
+ def comment_delete(self, comment_id):
+ '''Delete specified comment.
+ comment_id can be acquired by listing comments for an object.'''
+ return self(comment_id, method='delete')
class SkyDriveAPI(SkyDriveAPIWrapper):
-
- '''Biased synchronous SkyDrive API interface.
- Adds some derivative convenience methods over SkyDriveAPIWrapper.'''
-
- def resolve_path( self, path,
- root_id='me/skydrive', objects=False ):
- '''Return id (or metadata) of an object, specified by chain
- (iterable or fs-style path string) of "name" attributes of it's ancestors,
- or raises DoesNotExists error.
- Requires a lot of calls to resolve each name in path, so use with care.
- root_id parameter allows to specify path
- relative to some folder_id (default: me/skydrive).'''
- if path:
- if isinstance(path, types.StringTypes):
- if not path.startswith('me/skydrive'):
- path = filter(None, path.split(os.sep))
- else: root_id, path = path, None
- if path:
- try:
- for i, name in enumerate(path):
- root_id = dict(it.imap(
- op.itemgetter('name', 'id'), self.listdir(root_id) ))[name]
- except (KeyError, ProtocolError) as err:
- if isinstance(err, ProtocolError) and err.code != 404: raise
- raise DoesNotExists(root_id, path[i:])
- return root_id if not objects else self.info(root_id)
-
- def get_quota(self):
- 'Return tuple of (bytes_available, bytes_quota).'
- return op.itemgetter('available', 'quota')\
- (super(SkyDriveAPI, self).get_quota())
-
- def listdir(self, folder_id='me/skydrive', type_filter=None, limit=None):
- '''Return a list of objects in the specified folder_id.
- limit is passed to the API, so might be used as optimization.
- type_filter can be set to type (str) or sequence
- of object types to return, post-api-call processing.'''
- lst = super(SkyDriveAPI, self).listdir(folder_id=folder_id, limit=limit)['data']
- if type_filter:
- if isinstance(type_filter, types.StringTypes): type_filter = {type_filter}
- lst = list(obj for obj in lst if obj['type'] in type_filter)
- return lst
-
- def copy(self, obj_id, folder_id, move=False):
- '''Copy specified file (object) to a folder.
- Note that folders cannot be copied, this is API limitation.'''
- if folder_id.startswith('me/skydrive'):
- log.info("Special folder names (like 'me/skydrive') don't"
- " seem to work with copy/move operations, resolving it to id")
- folder_id = self.info(folder_id)['id']
- return super(SkyDriveAPI, self).copy(obj_id, folder_id, move=move)
-
- def comments(self, obj_id):
- 'Get a list of comments (message + metadata) for an object.'
- return super(SkyDriveAPI, self).comments(obj_id)['data']
-
+ '''Biased synchronous SkyDrive API interface.
+ Adds some derivative convenience methods over SkyDriveAPIWrapper.'''
+
+ def resolve_path( self, path,
+ root_id='me/skydrive', objects=False ):
+ '''Return id (or metadata) of an object, specified by chain
+ (iterable or fs-style path string) of "name" attributes of it's ancestors,
+ or raises DoesNotExists error.
+ Requires a lot of calls to resolve each name in path, so use with care.
+ root_id parameter allows to specify path
+ relative to some folder_id (default: me/skydrive).'''
+ if path:
+ if isinstance(path, types.StringTypes):
+ if not path.startswith('me/skydrive'):
+ path = filter(None, path.split(os.sep))
+ else:
+ root_id, path = path, None
+ if path:
+ try:
+ for i, name in enumerate(path):
+ root_id = dict(it.imap(
+ op.itemgetter('name', 'id'), self.listdir(root_id)))[name]
+ except (KeyError, ProtocolError) as err:
+ if isinstance(err, ProtocolError) and err.code != 404: raise
+ raise DoesNotExists(root_id, path[i:])
+ return root_id if not objects else self.info(root_id)
+
+ def get_quota(self):
+ 'Return tuple of (bytes_available, bytes_quota).'
+ return op.itemgetter('available', 'quota') \
+ (super(SkyDriveAPI, self).get_quota())
+
+ def listdir(self, folder_id='me/skydrive', type_filter=None, limit=None):
+ '''Return a list of objects in the specified folder_id.
+ limit is passed to the API, so might be used as optimization.
+ type_filter can be set to type (str) or sequence
+ of object types to return, post-api-call processing.'''
+ lst = super(SkyDriveAPI, self).listdir(folder_id=folder_id, limit=limit)['data']
+ if type_filter:
+ if isinstance(type_filter, types.StringTypes): type_filter = {type_filter}
+ lst = list(obj for obj in lst if obj['type'] in type_filter)
+ return lst
+
+ def copy(self, obj_id, folder_id, move=False):
+ '''Copy specified file (object) to a folder.
+ Note that folders cannot be copied, this is API limitation.'''
+ if folder_id.startswith('me/skydrive'):
+ log.info("Special folder names (like 'me/skydrive') don't"
+ " seem to work with copy/move operations, resolving it to id")
+ folder_id = self.info(folder_id)['id']
+ return super(SkyDriveAPI, self).copy(obj_id, folder_id, move=move)
+
+ def comments(self, obj_id):
+ 'Get a list of comments (message + metadata) for an object.'
+ return super(SkyDriveAPI, self).comments(obj_id)['data']
class PersistentSkyDriveAPI(SkyDriveAPI, ConfigMixin):
+ conf_raise_structure_errors = True
- conf_raise_structure_errors = True
-
- @ft.wraps(SkyDriveAPI.auth_get_token)
- def auth_get_token(self, *argz, **kwz):
- # Wrapped to push new tokens to storage asap.
- ret = super(PersistentSkyDriveAPI, self).auth_get_token(*argz, **kwz)
- self.sync()
- return ret
+ @ft.wraps(SkyDriveAPI.auth_get_token)
+ def auth_get_token(self, *argz, **kwz):
+ # Wrapped to push new tokens to storage asap.
+ ret = super(PersistentSkyDriveAPI, self).auth_get_token(*argz, **kwz)
+ self.sync()
+ return ret
- def __del__(self): self.sync()
+ def __del__(self): self.sync()
View
479 skydrive/cli_tool.py
@@ -2,253 +2,264 @@
#-*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function
-
import itertools as it, operator as op, functools as ft
from os.path import dirname, exists, isdir, join
import os, sys, io, re, yaml, json
-try: from skydrive import api_v5, conf
+try:
+ from skydrive import api_v5, conf
except ImportError:
- # Make sure it works from a checkout
- if isdir(join(dirname(__file__), 'skydrive'))\
- and exists(join(dirname(__file__), 'setup.py')):
- sys.path.insert(0, dirname(__file__))
- from skydrive import api_v5, conf
+ # Make sure it works from a checkout
+ if isdir(join(dirname(__file__), 'skydrive')) \
+ and exists(join(dirname(__file__), 'setup.py')):
+ sys.path.insert(0, dirname(__file__))
+ from skydrive import api_v5, conf
+ else:
+ import api_v5, conf
def print_result(data, file=sys.stdout):
- yaml.safe_dump(data, file, default_flow_style=False)
+ yaml.safe_dump(data, file, default_flow_style=False)
+
+
+def size_units(size,
+ _units=list(reversed(list((u, 2 ** (i * 10))
+ for i, u in enumerate('BKMGT')))) ):
+ for u, u1 in _units:
+ if size > u1: break
+ return size / float(u1), u
-def size_units( size,
- _units = list(reversed(list( (u,2**(i*10))
- for i,u in enumerate('BKMGT') ))) ):
- for u,u1 in _units:
- if size > u1: break
- return size / float(u1), u
def id_match( s,
- _re_id=re.compile(r'^(file|folder)\.[0-9a-f]{16}\.[0-9A-F]{16}!\d+|folder\.[0-9a-f]{16}$') ):
- return s if _re_id.search(s) else None
+ _re_id=re.compile(r'^(file|folder)\.[0-9a-f]{16}\.[0-9A-F]{16}!\d+|folder\.[0-9a-f]{16}$') ):
+ return s if _re_id.search(s) else None
+
def main():
- import argparse
- parser = argparse.ArgumentParser(
- description='Tool to manipulate SkyDrive contents.')
- parser.add_argument('-c', '--config',
- metavar='path', default=conf.ConfigMixin.conf_path_default,
- help='Writable configuration state-file (yaml).'
- ' Used to store authorization_code, access and refresh tokens.'
- ' Should initially contain at least something like "{client: {id: xxx, secret: yyy}}".'
- ' Default: %(default)s')
-
- parser.add_argument('-p', '--path', action='store_true',
- help='Interpret file/folder arguments only as human paths, not ids (default: guess).'
- ' Avoid using such paths if non-unique "name" attributes'
- ' of objects in the same parent folder might be used.')
- parser.add_argument('-i', '--id', action='store_true',
- help='Interpret file/folder arguments only as ids (default: guess).')
-
- parser.add_argument('--debug',
- action='store_true', help='Verbose operation mode.')
-
- cmds = parser.add_subparsers(title='Supported operations')
-
- def add_command(name, **kwz):
- cmd = cmds.add_parser(name, **kwz)
- cmd.set_defaults(call=name)
- return cmd
-
- cmd = add_command('auth', help='Perform user authentication.')
- cmd.add_argument('url', nargs='?',
- help='URL with the authorization_code.')
-
- add_command('quota', help='Print quota information.')
- add_command('recent', help='List recently changed objects.')
-
- cmd = add_command('info', help='Display object metadata.')
- cmd.add_argument('object',
- nargs='?', default='me/skydrive',
- help='Object to get info on (default: %(default)s).')
-
- cmd = add_command('info_set', help='Manipulate object metadata.')
- cmd.add_argument('object',
- help='Object to manipulate metadata for.')
- cmd.add_argument('data',
- help='JSON mapping of values to set'
- ' (example: {"name": "new_file_name.jpg"}).')
-
- cmd = add_command('link', help='Get a link to a file.')
- cmd.add_argument('object', help='Object to get link for.')
- cmd.add_argument('-t', '--type', default='shared_read_link',
- help='Type of link to request. Possible values'
- ' (default: %(default)s): shared_read_link, embed, shared_edit_link.')
-
- cmd = add_command('ls', help='List folder contents.')
- cmd.add_argument('-o', '--objects', action='store_true',
- help='Dump full objects, not just name and id.')
- cmd.add_argument('folder',
- nargs='?', default='me/skydrive',
- help='Folder to list contents of (default: %(default)s).')
-
- cmd = add_command('mkdir', help='Create a folder.')
- cmd.add_argument('name', help='Name of a folder to create.')
- cmd.add_argument('folder',
- nargs='?', default='me/skydrive',
- help='Parent folder (default: %(default)s).')
- cmd.add_argument('-m', '--metadata',
- help='JSON mappings of metadata to set for the created folder.'
- ' Optonal. Example: {"description": "Photos from last trip to Mordor"}')
-
- cmd = add_command('get', help='Download file contents.')
- cmd.add_argument('file', help='File (object) to read.')
- cmd.add_argument('-b', '--byte-range',
- help='Specific range of bytes to read from a file (default: read all).'
- ' Should be specified in rfc2616 Range HTTP header format.'
- ' Examples: 0-499 (start - 499), -500 (end-500 to end).')
-
- cmd = add_command('put', help='Upload a file.')
- cmd.add_argument('file', help='Path to a local file to upload.')
- cmd.add_argument('folder',
- nargs='?', default='me/skydrive',
- help='Folder to put file into (default: %(default)s).')
- cmd.add_argument('-n', '--no-overwrite', action='store_true',
- help='Do not overwrite existing files with the same "name" attribute (visible name).')
-
- cmd = add_command('cp', help='Copy file to a folder.')
- cmd.add_argument('file', help='File (object) to copy.')
- cmd.add_argument('folder',
- nargs='?', default='me/skydrive',
- help='Folder to copy file to (default: %(default)s).')
-
- cmd = add_command('mv', help='Move file to a folder.')
- cmd.add_argument('file', help='File (object) to move.')
- cmd.add_argument('folder',
- nargs='?', default='me/skydrive',
- help='Folder to move file to (default: %(default)s).')
-
- cmd = add_command('rm', help='Remove object (file or folder).')
- cmd.add_argument('object', nargs='+', help='Object(s) to remove.')
-
- cmd = add_command('comments', help='Show comments for a file, object or folder.')
- cmd.add_argument('object', help='Object to show comments for.')
-
- cmd = add_command('comment_add', help='Add comment for a file, object or folder.')
- cmd.add_argument('object', help='Object to add comment for.')
- cmd.add_argument('message', help='Comment message to add.')
-
- cmd = add_command('comment_delete', help='Delete comment from a file, object or folder.')
- cmd.add_argument('comment_id',
- help='ID of the comment to remove (use "comments"'
- ' action to get comment ids along with the messages).')
-
- cmd = add_command('tree',
- help='Show contents of skydrive (or folder) as a tree of file/folder names.'
- ' Note that this operation will have to (separately) request a listing'
- ' of every folder under the specified one, so can be quite slow for large'
- ' number of these.')
- cmd.add_argument('folder',
- nargs='?', default='me/skydrive',
- help='Folder to display contents of (default: %(default)s).')
-
- optz = parser.parse_args()
- if optz.path and optz.id:
- parser.error('--path and --id options cannot be used together.')
-
- import logging
- log = logging.getLogger()
- logging.basicConfig(level=logging.WARNING
- if not optz.debug else logging.DEBUG)
-
- api = api_v5.PersistentSkyDriveAPI.from_conf(optz.config)
- res = xres = None
- resolve_path = ( (lambda s: id_match(s) or api.resolve_path(s))\
- if not optz.path else api.resolve_path ) if not optz.id else lambda obj_id: obj_id
-
- if optz.call == 'auth':
- if not optz.url:
- print( 'Visit the following URL in any web browser (firefox, chrome, safari, etc),\n'
- ' authorize there, confirm access permissions, and paste URL of an empty page\n'
- ' (starting with "https://login.live.com/oauth20_desktop.srf")'
- ' you will get redirected to in the end.' )
- print( 'Alternatively, use the returned (after redirects)'
- ' URL with "{} auth <URL>" command.\n'.format(sys.argv[0]) )
- print('URL to visit: {}\n'.format(api.auth_user_get_url()))
- optz.url = raw_input('URL after last redirect: ').strip()
- if optz.url:
- api.auth_user_process_url(optz.url)
- api.auth_get_token()
- print('API authorization was completed successfully.')
-
- elif optz.call == 'quota':
- df, ds = map(size_units, api.get_quota())
- res = dict(free='{:.1f}{}'.format(*df), quota='{:.1f}{}'.format(*ds))
- elif optz.call == 'recent':
- res = api('me/skydrive/recent_docs')['data']
-
- elif optz.call == 'ls':
- res = list(api.listdir(resolve_path(optz.folder)))
- if not optz.objects: res = map(op.itemgetter('name'), res)
-
- elif optz.call == 'info': res = api.info(resolve_path(optz.object))
- elif optz.call == 'info_set':
- xres = api.info_update(
- resolve_path(optz.object), json.loads(optz.data) )
- elif optz.call == 'link':
- res = api.link(resolve_path(optz.object), optz.type)
-
- elif optz.call == 'comments':
- res = api.comments(resolve_path(optz.object))
- elif optz.call == 'comment_add':
- res = api.comment_add(resolve_path(optz.object), optz.message)
- elif optz.call == 'comment_delete':
- res = api.comment_delete(optz.comment_id)
-
- elif optz.call == 'mkdir':
- xres = api.mkdir( name=optz.name, folder_id=resolve_path(optz.folder),
- metadata=optz.metadata and json.loads(optz.metadata) or dict() )
-
- elif optz.call == 'get':
- sys.stdout.write(api.get(
- resolve_path(optz.file), byte_range=optz.byte_range ))
- sys.stdout.flush()
- elif optz.call == 'put':
- xres = api.put( optz.file,
- resolve_path(optz.folder), overwrite=not optz.no_overwrite )
-
- elif optz.call in ['cp', 'mv']:
- argz = map(resolve_path, [optz.file, optz.folder])
- xres = (api.move if optz.call == 'mv' else api.copy)(*argz)
-
- elif optz.call == 'rm':
- for obj in it.imap(resolve_path, optz.object): xres = api.delete(obj)
-
-
- elif optz.call == 'tree':
- from yaml.dumper import SafeDumper
- class Pairs(list):
- @staticmethod
- def yaml_representer(dumper, data):
- return dumper.represent_mapping('tag:yaml.org,2002:map', data)
- SafeDumper.add_representer(Pairs, Pairs.yaml_representer)
-
- def recurse(obj_id):
- node = Pairs()
- for obj in api.listdir(obj_id):
- node.append(( obj['name'], recurse(obj['id'])\
- if obj['type'] in ['folder', 'album'] else obj['type'] ))
- return node
-
- root_id = resolve_path(optz.folder)
- res = {api.info(root_id)['name']: recurse(root_id)}
-
-
- else: parser.error('Unrecognized command: {}'.format(optz.call))
-
- if res is not None: print_result(res)
- if optz.debug and xres is not None:
- buff = io.BytesIO()
- print_result(xres, file=buff)
- log.debug('Call result:\n{0}\n{1}{0}'.format('-'*20, buff.getvalue()))
+ import argparse
+
+ parser = argparse.ArgumentParser(
+ description='Tool to manipulate SkyDrive contents.')
+ parser.add_argument('-c', '--config',
+ metavar='path', default=conf.ConfigMixin.conf_path_default,
+ help='Writable configuration state-file (yaml).'
+ ' Used to store authorization_code, access and refresh tokens.'
+ ' Should initially contain at least something like "{client: {id: xxx, secret: yyy}}".'
+ ' Default: %(default)s')
+
+ parser.add_argument('-p', '--path', action='store_true',
+ help='Interpret file/folder arguments only as human paths, not ids (default: guess).'
+ ' Avoid using such paths if non-unique "name" attributes'
+ ' of objects in the same parent folder might be used.')
+ parser.add_argument('-i', '--id', action='store_true',
+ help='Interpret file/folder arguments only as ids (default: guess).')
+
+ parser.add_argument('--debug',
+ action='store_true', help='Verbose operation mode.')
+
+ cmds = parser.add_subparsers(title='Supported operations')
+
+ def add_command(name, **kwz):
+ cmd = cmds.add_parser(name, **kwz)
+ cmd.set_defaults(call=name)
+ return cmd
+
+ cmd = add_command('auth', help='Perform user authentication.')
+ cmd.add_argument('url', nargs='?',
+ help='URL with the authorization_code.')
+
+ add_command('quota', help='Print quota information.')
+ add_command('recent', help='List recently changed objects.')
+
+ cmd = add_command('info', help='Display object metadata.')
+ cmd.add_argument('object',
+ nargs='?', default='me/skydrive',
+ help='Object to get info on (default: %(default)s).')
+
+ cmd = add_command('info_set', help='Manipulate object metadata.')
+ cmd.add_argument('object',
+ help='Object to manipulate metadata for.')
+ cmd.add_argument('data',
+ help='JSON mapping of values to set'
+ ' (example: {"name": "new_file_name.jpg"}).')
+
+ cmd = add_command('link', help='Get a link to a file.')
+ cmd.add_argument('object', help='Object to get link for.')
+ cmd.add_argument('-t', '--type', default='shared_read_link',
+ help='Type of link to request. Possible values'
+ ' (default: %(default)s): shared_read_link, embed, shared_edit_link.')
+
+ cmd = add_command('ls', help='List folder contents.')
+ cmd.add_argument('-o', '--objects', action='store_true',
+ help='Dump full objects, not just name and id.')
+ cmd.add_argument('folder',
+ nargs='?', default='me/skydrive',
+ help='Folder to list contents of (default: %(default)s).')
+
+ cmd = add_command('mkdir', help='Create a folder.')
+ cmd.add_argument('name', help='Name of a folder to create.')
+ cmd.add_argument('folder',
+ nargs='?', default='me/skydrive',
+ help='Parent folder (default: %(default)s).')
+ cmd.add_argument('-m', '--metadata',
+ help='JSON mappings of metadata to set for the created folder.'
+ ' Optonal. Example: {"description": "Photos from last trip to Mordor"}')
+
+ cmd = add_command('get', help='Download file contents.')
+ cmd.add_argument('file', help='File (object) to read.')
+ cmd.add_argument('-b', '--byte-range',
+ help='Specific range of bytes to read from a file (default: read all).'
+ ' Should be specified in rfc2616 Range HTTP header format.'
+ ' Examples: 0-499 (start - 499), -500 (end-500 to end).')
+
+ cmd = add_command('put', help='Upload a file.')
+ cmd.add_argument('file', help='Path to a local file to upload.')
+ cmd.add_argument('folder',
+ nargs='?', default='me/skydrive',
+ help='Folder to put file into (default: %(default)s).')
+ cmd.add_argument('-n', '--no-overwrite', action='store_true',
+ help='Do not overwrite existing files with the same "name" attribute (visible name).')
+
+ cmd = add_command('cp', help='Copy file to a folder.')
+ cmd.add_argument('file', help='File (object) to copy.')
+ cmd.add_argument('folder',
+ nargs='?', default='me/skydrive',
+ help='Folder to copy file to (default: %(default)s).')
+
+ cmd = add_command('mv', help='Move file to a folder.')
+ cmd.add_argument('file', help='File (object) to move.')
+ cmd.add_argument('folder',
+ nargs='?', default='me/skydrive',
+ help='Folder to move file to (default: %(default)s).')
+
+ cmd = add_command('rm', help='Remove object (file or folder).')
+ cmd.add_argument('object', nargs='+', help='Object(s) to remove.')
+
+ cmd = add_command('comments', help='Show comments for a file, object or folder.')
+ cmd.add_argument('object', help='Object to show comments for.')
+
+ cmd = add_command('comment_add', help='Add comment for a file, object or folder.')
+ cmd.add_argument('object', help='Object to add comment for.')
+ cmd.add_argument('message', help='Comment message to add.')
+
+ cmd = add_command('comment_delete', help='Delete comment from a file, object or folder.')
+ cmd.add_argument('comment_id',
+ help='ID of the comment to remove (use "comments"'
+ ' action to get comment ids along with the messages).')
+
+ cmd = add_command('tree',
+ help='Show contents of skydrive (or folder) as a tree of file/folder names.'
+ ' Note that this operation will have to (separately) request a listing'
+ ' of every folder under the specified one, so can be quite slow for large'
+ ' number of these.')
+ cmd.add_argument('folder',
+ nargs='?', default='me/skydrive',
+ help='Folder to display contents of (default: %(default)s).')
+
+ optz = parser.parse_args()
+ if optz.path and optz.id:
+ parser.error('--path and --id options cannot be used together.')
+
+ import logging
+
+ log = logging.getLogger()
+ logging.basicConfig(level=logging.WARNING
+ if not optz.debug else logging.DEBUG)
+
+ api = api_v5.PersistentSkyDriveAPI.from_conf(optz.config)
+ res = xres = None
+ resolve_path = ( (lambda s: id_match(s) or api.resolve_path(s)) \
+ if not optz.path else api.resolve_path ) if not optz.id else lambda obj_id: obj_id
+
+ if optz.call == 'auth':
+ if not optz.url:
+ print('Visit the following URL in any web browser (firefox, chrome, safari, etc),\n'
+ ' authorize there, confirm access permissions, and paste URL of an empty page\n'
+ ' (starting with "https://login.live.com/oauth20_desktop.srf")'
+ ' you will get redirected to in the end.')
+ print('Alternatively, use the returned (after redirects)'
+ ' URL with "{} auth <URL>" command.\n'.format(sys.argv[0]))
+ print('URL to visit: {}\n'.format(api.auth_user_get_url()))
+ optz.url = raw_input('URL after last redirect: ').strip()
+ if optz.url:
+ api.auth_user_process_url(optz.url)
+ api.auth_get_token()
+ print('API authorization was completed successfully.')
+
+ elif optz.call == 'quota':
+ df, ds = map(size_units, api.get_quota())
+ res = dict(free='{:.1f}{}'.format(*df), quota='{:.1f}{}'.format(*ds))
+ elif optz.call == 'recent':
+ res = api('me/skydrive/recent_docs')['data']
+
+ elif optz.call == 'ls':
+ res = list(api.listdir(resolve_path(optz.folder)))
+ if not optz.objects: res = map(op.itemgetter('name'), res)
+
+ elif optz.call == 'info':
+ res = api.info(resolve_path(optz.object))
+ elif optz.call == 'info_set':
+ xres = api.info_update(
+ resolve_path(optz.object), json.loads(optz.data))
+ elif optz.call == 'link':
+ res = api.link(resolve_path(optz.object), optz.type)
+
+ elif optz.call == 'comments':
+ res = api.comments(resolve_path(optz.object))
+ elif optz.call == 'comment_add':
+ res = api.comment_add(resolve_path(optz.object), optz.message)
+ elif optz.call == 'comment_delete':
+ res = api.comment_delete(optz.comment_id)
+
+ elif optz.call == 'mkdir':
+ xres = api.mkdir(name=optz.name, folder_id=resolve_path(optz.folder),
+ metadata=optz.metadata and json.loads(optz.metadata) or dict())
+
+ elif optz.call == 'get':
+ sys.stdout.write(api.get(
+ resolve_path(optz.file), byte_range=optz.byte_range))
+ sys.stdout.flush()
+ elif optz.call == 'put':
+ xres = api.put(optz.file,
+ resolve_path(optz.folder), overwrite=not optz.no_overwrite)
+
+ elif optz.call in ['cp', 'mv']:
+ argz = map(resolve_path, [optz.file, optz.folder])
+ xres = (api.move if optz.call == 'mv' else api.copy)(*argz)
+
+ elif optz.call == 'rm':
+ for obj in it.imap(resolve_path, optz.object): xres = api.delete(obj)
+
+
+ elif optz.call == 'tree':
+ from yaml.dumper import SafeDumper
+
+ class Pairs(list):
+ @staticmethod
+ def yaml_representer(dumper, data):
+ return dumper.represent_mapping('tag:yaml.org,2002:map', data)
+
+ SafeDumper.add_representer(Pairs, Pairs.yaml_representer)
+
+ def recurse(obj_id):
+ node = Pairs()
+ for obj in api.listdir(obj_id):
+ node.append(( obj['name'], recurse(obj['id']) \
+ if obj['type'] in ['folder', 'album'] else obj['type'] ))
+ return node
+
+ root_id = resolve_path(optz.folder)
+ res = {api.info(root_id)['name']: recurse(root_id)}
+
+
+ else:
+ parser.error('Unrecognized command: {}'.format(optz.call))
+
+ if res is not None: print_result(res)
+ if optz.debug and xres is not None:
+ buff = io.BytesIO()
+ print_result(xres, file=buff)
+ log.debug('Call result:\n{0}\n{1}{0}'.format('-' * 20, buff.getvalue()))
if __name__ == '__main__': main()
View
227 skydrive/conf.py
@@ -1,124 +1,127 @@
#-*- coding: utf-8 -*-
from __future__ import unicode_literals, print_function
-
import itertools as it, operator as op, functools as ft
import os, sys, io, errno, tempfile, fcntl, stat
from os.path import dirname, basename
import logging
+
log = logging.getLogger(__name__)
class ConfigMixin(object):
-
- #: Path to configuration file to use in from_conf() by default.
- conf_path_default = b'~/.lcrc'
-
- #: If set to some path, updates will be written back to it.
- conf_save = False
-
- #: Raise human-readable errors on structure issues,
- #: which assume that there is an user-accessible configuration file
- conf_raise_structure_errors = False
-
- #: Hierarchical list of keys to write back
- #: to configuration file (preserving the rest) on updates.
- conf_update_keys = dict(
- client={'id', 'secret'},
- auth={'code', 'refresh_token', 'access_expires', 'access_token'} )
-
-
- def __init__(self, **kwz):
- raise NotImplementedError('Init should be overidden with something configurable')
-
-
- @classmethod
- def from_conf(cls, path=None, **overrides):
- '''Initialize instance from YAML configuration file,
- writing updates (only to keys, specified by "conf_update_keys") back to it.'''
- import yaml
-
- if path is None:
- path = cls.conf_path_default
- log.debug('Using default state-file path: {}'.format(path))
- path = os.path.expanduser(path)
- with open(path) as src:
- fcntl.lockf(src, fcntl.LOCK_SH)
- conf = yaml.load(src.read())
- conf.setdefault('conf_save', path)
-
- conf_cls = dict()
- for ns, keys in cls.conf_update_keys.viewitems():
- for k in keys:
- try: v = conf.get(ns, dict()).get(k)
- except AttributeError:
- if not cls.conf_raise_structure_errors: raise
- raise KeyError( 'Unable to get value for configuration parameter'
- ' "{k}" in section "{ns}", check configuration file (path: {path}) syntax'
- ' near the aforementioned section/value.'.format(ns=ns, k=k, path=path) )
- if v is not None:
- conf_cls['{}_{}'.format(ns, k)] = conf[ns][k]
- conf_cls.update(overrides)
-
- self = cls(**conf_cls)
- self.conf_save = conf['conf_save']
- return self
-
-
- def sync(self):
- if not self.conf_save: return
- import yaml
-
- retry = False
- with open(self.conf_save, 'r+') as src:
- fcntl.lockf(src, fcntl.LOCK_SH)
- conf_raw = src.read()
- conf = yaml.load(io.BytesIO(conf_raw)) if conf_raw else dict()
-
- conf_updated = False
- for ns, keys in self.conf_update_keys.viewitems():
- for k in keys:
- v = getattr(self, '{}_{}'.format(ns, k), None)
- if isinstance(v, unicode): v = v.encode('utf-8')
- if v != conf.get(ns, dict()).get(k):
- # log.debug(
- # 'Different val ({}.{}): {!r} != {!r}'\
- # .format(ns, k, v, conf.get(ns, dict()).get(k)) )
- conf.setdefault(ns, dict())[k] = v
- conf_updated = True
-
- if conf_updated:
- log.debug('Updating configuration file ({})'.format(src.name))
- with tempfile.NamedTemporaryFile(
- prefix='{}.'.format(basename(self.conf_save)),
- dir=dirname(self.conf_save), delete=False ) as tmp:
- try:
- fcntl.lockf(tmp, fcntl.LOCK_EX)
- yaml.safe_dump(conf, tmp, default_flow_style=False)
- tmp.flush()
- os.fchmod( tmp.fileno(),
- stat.S_IMODE(os.fstat(src.fileno()).st_mode) )
-
- fcntl.lockf(src, fcntl.LOCK_EX)
- src.seek(0)
- if src.read() != conf_raw: retry = True
- else:
- # Atomic update
- os.rename(tmp.name, src.name)
-
- # Non-atomic update for pids that already have fd to old file,
- # but (presumably) are waiting for the write-lock to be released
- src.seek(0), tmp.seek(0)
- src.truncate()
- src.write(tmp.read())
- src.flush()
-
- finally:
- try: os.unlink(tmp.name)
- except OSError: pass
-
- if retry:
- log.debug(( 'Configuration file ({}) was changed'
- ' during merge, restarting merge' ).format(self.conf_save))
- return self.sync()
+ #: Path to configuration file to use in from_conf() by default.
+ conf_path_default = b'~/.lcrc'
+
+ #: If set to some path, updates will be written back to it.
+ conf_save = False
+
+ #: Raise human-readable errors on structure issues,
+ #: which assume that there is an user-accessible configuration file
+ conf_raise_structure_errors = False
+
+ #: Hierarchical list of keys to write back
+ #: to configuration file (preserving the rest) on updates.
+ conf_update_keys = dict(
+ client={'id', 'secret'},
+ auth={'code', 'refresh_token', 'access_expires', 'access_token'})
+
+
+ def __init__(self, **kwz):
+ raise NotImplementedError('Init should be overidden with something configurable')
+
+
+ @classmethod
+ def from_conf(cls, path=None, **overrides):
+ '''Initialize instance from YAML configuration file,
+ writing updates (only to keys, specified by "conf_update_keys") back to it.'''
+ import yaml
+
+ if path is None:
+ path = cls.conf_path_default
+ log.debug('Using default state-file path: {}'.format(path))
+ path = os.path.expanduser(path)
+ with open(path) as src:
+ fcntl.lockf(src, fcntl.LOCK_SH)
+ conf = yaml.load(src.read())
+ conf.setdefault('conf_save', path)
+
+ conf_cls = dict()
+ for ns, keys in cls.conf_update_keys.viewitems():
+ for k in keys:
+ try:
+ v = conf.get(ns, dict()).get(k)
+ except AttributeError:
+ if not cls.conf_raise_structure_errors: raise
+ raise KeyError('Unable to get value for configuration parameter'
+ ' "{k}" in section "{ns}", check configuration file (path: {path}) syntax'
+ ' near the aforementioned section/value.'.format(ns=ns, k=k, path=path))
+ if v is not None:
+ conf_cls['{}_{}'.format(ns, k)] = conf[ns][k]
+ conf_cls.update(overrides)
+
+ self = cls(**conf_cls)
+ self.conf_save = conf['conf_save']
+ return self
+
+
+ def sync(self):
+ if not self.conf_save: return
+ import yaml
+
+ retry = False
+ with open(self.conf_save, 'r+') as src:
+ fcntl.lockf(src, fcntl.LOCK_SH)
+ conf_raw = src.read()
+ conf = yaml.load(io.BytesIO(conf_raw)) if conf_raw else dict()
+
+ conf_updated = False
+ for ns, keys in self.conf_update_keys.viewitems():
+ for k in keys:
+ v = getattr(self, '{}_{}'.format(ns, k), None)
+ if isinstance(v, unicode): v = v.encode('utf-8')
+ if v != conf.get(ns, dict()).get(k):
+ # log.debug(
+ # 'Different val ({}.{}): {!r} != {!r}'\
+ # .format(ns, k, v, conf.get(ns, dict()).get(k)) )
+ conf.setdefault(ns, dict())[k] = v
+ conf_updated = True
+
+ if conf_updated:
+ log.debug('Updating configuration file ({})'.format(src.name))
+ with tempfile.NamedTemporaryFile(
+ prefix='{}.'.format(basename(self.conf_save)),
+ dir=dirname(self.conf_save), delete=False) as tmp:
+ try:
+ fcntl.lockf(tmp, fcntl.LOCK_EX)
+ yaml.safe_dump(conf, tmp, default_flow_style=False)
+ tmp.flush()
+ os.fchmod(tmp.fileno(),
+ stat.S_IMODE(os.fstat(src.fileno()).st_mode))
+
+ fcntl.lockf(src, fcntl.LOCK_EX)
+ src.seek(0)
+ if src.read() != conf_raw:
+ retry = True
+ else:
+ # Atomic update
+ os.rename(tmp.name, src.name)
+
+ # Non-atomic update for pids that already have fd to old file,
+ # but (presumably) are waiting for the write-lock to be released
+ src.seek(0), tmp.seek(0)
+ src.truncate()
+ src.write(tmp.read())
+ src.flush()
+
+ finally:
+ try:
+ os.unlink(tmp.name)
+ except OSError:
+ pass
+
+ if retry:
+ log.debug(( 'Configuration file ({}) was changed'
+ ' during merge, restarting merge' ).format(self.conf_save))
+ return self.sync()

0 comments on commit a2bbccd

Please sign in to comment.