Skip to content

Commit

Permalink
Various fixes/improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
tkem committed Mar 4, 2016
1 parent 0a03c87 commit c82d0f3
Show file tree
Hide file tree
Showing 14 changed files with 515 additions and 504 deletions.
7 changes: 0 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,6 @@ language: python
python:
- "2.7_with_system_site_packages"

addons:
apt:
sources:
- mopidy-stable
packages:
- mopidy

env:
- TOX_ENV=py27
- TOX_ENV=flake8
Expand Down
27 changes: 12 additions & 15 deletions mopidy_podcast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

import os

from mopidy import config, ext, httpclient
from mopidy import config, exceptions, ext, httpclient

__version__ = '2.0.0'


class BackendError(exceptions.BackendError):
pass


class Extension(ext.Extension):

dist_name = 'Mopidy-Podcast'
Expand All @@ -20,9 +24,8 @@ def get_config_schema(self):
schema = super(Extension, self).get_config_schema()
schema['feeds'] = config.List(optional=True)
schema['import_dir'] = config.Path(optional=True)
schema['update_interval'] = config.Integer(minimum=3600)
schema['update_interval'] = config.Integer(minimum=60)
schema['browse_order'] = config.String(choices=['asc', 'desc'])
schema['lookup_order'] = config.String(choices=['asc', 'desc'])
schema['search_limit'] = config.Integer(optional=True, minimum=1)
schema['cache_size'] = config.Integer(minimum=1)
schema['cache_ttl'] = config.Integer(minimum=1)
Expand All @@ -41,17 +44,11 @@ def setup(self, registry):
registry.add('backend', PodcastBackend)

@classmethod
def get_url_opener(cls, config):
import urllib2
def get_requests_session(cls, config):
import requests
session = requests.Session()
proxy = httpclient.format_proxy(config['proxy'])
if proxy:
opener = urllib2.build_opener(
urllib2.ProxyHandler({'http': proxy, 'https': proxy})
)
else:
opener = urllib2.build_opener()
session.proxies.update({'http': proxy, 'https': proxy})
name = '%s/%s' % (cls.dist_name, cls.version)
opener.addheaders = [
('User-agent', httpclient.format_user_agent(name))
]
return opener
session.headers['User-Agent'] = httpclient.format_user_agent(name)
return session
160 changes: 97 additions & 63 deletions mopidy_podcast/backend.py
Original file line number Diff line number Diff line change
@@ -1,146 +1,178 @@
from __future__ import unicode_literals

import contextlib
import datetime
import logging
import os
import threading
import xml.etree.ElementTree

import cachetools

from mopidy import backend

import pykka

from . import Extension, opml, rss, schema
from . import BackendError, Extension, rss, schema
from .library import PodcastLibraryProvider
from .playback import PodcastPlaybackProvider

logger = logging.getLogger(__name__)


def parse_opml(path):
# http://dev.opml.org/spec2.html
root = xml.etree.ElementTree.parse(path).getroot()
for e in root.findall('./body//outline[@type="rss"]'):
url = e.get('xmlUrl')
if url:
yield url
else:
logger.warning('Found RSS outline without xmlUrl in %s', path)


def stream(session, url, **kwargs):
response = session.get(url, stream=True, **kwargs)
response.raise_for_status()
response.raw.decode_content = True
return contextlib.closing(response)


class PodcastCache(cachetools.TTLCache):

pykka_traversable = True

def __init__(self, config):
# TODO: missing deprecated in cachetools v1.2
# TODO: "missing" parameter will be deprecated in cachetools v1.2
super(PodcastCache, self).__init__(
maxsize=config[Extension.ext_name]['cache_size'],
ttl=config[Extension.ext_name]['cache_ttl'],
missing=self.__missing
)
self.__opener = Extension.get_url_opener(config)
self.__session = Extension.get_requests_session(config)
self.__timeout = config[Extension.ext_name]['timeout']

def __missing(self, feedurl):
logger.debug('Podcast cache miss: %s', feedurl)
with contextlib.closing(self.__open(feedurl)) as source:
podcast = rss.parse(source)
def __missing(self, url):
with stream(self.__session, url, timeout=self.__timeout) as r:
podcast = rss.parse(r.raw, url)
logger.debug('Retrieving %s took %s', url, r.elapsed)
return podcast

def __open(self, url):
return self.__opener.open(url, timeout=self.__timeout)


class PodcastIndexer(pykka.ThreadingActor):
class PodcastUpdateActor(pykka.ThreadingActor):

def __init__(self, dbpath, config, backend):
super(PodcastIndexer, self).__init__()
super(PodcastUpdateActor, self).__init__()
self.__dbpath = dbpath
self.__backend = backend.actor_ref.proxy()
self.__import_dir = config[Extension.ext_name]['import_dir']
if self.__import_dir is None:
# https://github.com/mopidy/mopidy/issues/1466
try:
self.__import_dir = Extension.get_config_dir(config)
except Exception as e:
logger.error('Cannot create podcast directory: %s', e)
logger.error('Cannot create podcast import directory: %s', e)
self.__feeds = frozenset(config[Extension.ext_name]['feeds'])
self.__opener = Extension.get_url_opener(config)
self.__timer = threading.Timer(0, self.refresh) # initial timeout 0
self.__session = Extension.get_requests_session(config)
self.__timeout = config[Extension.ext_name]['timeout']
self.__timer = threading.Timer(0, self.refresh) # initial zero timeout
self.__update_interval = config[Extension.ext_name]['update_interval']
self.__backend = backend.actor_ref.proxy()
self.__update_started = None
self.__proxy = self.actor_ref.proxy()

def on_start(self):
logger.debug('Starting %s', self.__class__.__name__)
self.__timer.start()

def on_stop(self):
logger.debug('Stopping %s', self.__class__.__name__)
self.__timer.cancel()

def refresh(self):
# TODO: guard/lock while refreshing; keep timestamp for logging
self.__timer = threading.Timer(self.__update_interval, self.refresh)
logger.info('Refreshing %s', Extension.dist_name)
feeds = tuple(self.__feeds.union(self.__scan_import_dir()))
def prepare_update(self, feeds):
try:
with schema.connect(self.__dbpath) as connection:
schema.cleanup(connection, feeds)
except Exception as e:
logger.error('Error refreshing %s: %s', Extension.dist_name, e)
for uri, _ in schema.list(connection):
if uri not in feeds:
schema.delete(connection, uri)
except Exception:
logger.exception('Error refreshing %s', Extension.dist_name)
self.__update_started = None
else:
self.__proxy.update(feeds)
self.__timer.start() # try again next time

def refresh(self):
timer = self.__timer
self.__timer = threading.Timer(self.__update_interval, self.refresh)
timer.cancel() # in case of manual refresh
# prevent multiple concurrent updates
if self.__update_started:
logger.debug('Already refreshing %s', Extension.dist_name)
else:
self.__update_started = datetime.datetime.now()
feeds = tuple(self.__feeds.union(self.__scan_import_dir()))
logger.info('Refreshing %d podcast(s)', len(feeds))
self.__proxy.prepare_update(feeds)
self.__timer.start()

def update(self, feeds):
if feeds:
head, tail = feeds[0], feeds[1:]
self.__update(head)
self.__proxy.update(tail)
else:
logger.debug('Refreshing %s done', Extension.dist_name)

def __update(self, feedurl):
try:
podcast = self.__fetch(feedurl)
except pykka.ActorDeadError as e:
logger.debug('Stopped while retrieving %s: %s', feedurl, e)
except Exception as e:
logger.error('Error retrieving podcast %s: %s', feedurl, e)
try:
self.__update(head)
except Exception:
logger.exception('Error refreshing %s', Extension.ext_name)
self.__update_started = None
else:
self.__proxy.update(tail)
else:
with schema.connect(self.__dbpath) as connection:
schema.update(connection, podcast)
d = datetime.datetime.now() - self.__update_started
logger.info('Refreshing %s took %s', Extension.dist_name, d)
self.__update_started = None

def __fetch(self, feedurl):
podcasts = self.__backend.podcasts
podcast = podcasts.get(feedurl).get()
if podcast is None:
logger.debug('Retrieving podcast %s', feedurl)
# running in the background, no timeout necessary
with contextlib.closing(self.__opener.open(feedurl)) as source:
podcast = rss.parse(source)
# TODO: If-Modified-Since with schema.pubdate(feedurl)?
with stream(self.__session, feedurl, timeout=self.__timeout) as r:
podcast = rss.parse(r.raw, feedurl)
logger.debug('Retrieving %s took %s', feedurl, r.elapsed)
podcast = podcasts.setdefault(feedurl, podcast).get()
return podcast

def __scan_import_dir(self):
result = []
for entry in os.listdir(self.__import_dir):
path = os.path.join(self.__import_dir, entry)
if not os.path.isfile(path):
continue
if not path.endswith(b'.opml'):
continue
try:
feedurls = self.__parse_file(path)
if not os.path.isfile(path):
continue
elif path.endswith(b'.opml'):
urls = parse_opml(path)
else:
logger.debug('Skipping unknown file %s', path)
except Exception as e:
logger.error('Error parsing %s: %s', path, e)
else:
result.extend(feedurls)
result.extend(urls)
return result

def __parse_file(self, path):
with open(path) as fh:
outlines = opml.parse(fh)
for outline in outlines:
if outline.get('type') == 'rss':
yield outline['xmlUrl']
def __update(self, feedurl):
try:
podcast = self.__fetch(feedurl)
except pykka.ActorDeadError:
logger.debug('Stopped while retrieving %s', feedurl)
except Exception as e:
logger.warning('Cannot update podcast %s: %s', feedurl, e)
else:
with schema.connect(self.__dbpath) as connection:
schema.update(connection, podcast)


class PodcastBackend(pykka.ThreadingActor, backend.Backend):

uri_schemes = [
'podcast',
'podcast+file',
'podcast+ftp',
'podcast+http',
'podcast+https'
]
Expand All @@ -149,17 +181,19 @@ def __init__(self, config, audio):
super(PodcastBackend, self).__init__()
# create/update database schema on startup to catch errors early
dbpath = os.path.join(Extension.get_data_dir(config), b'feeds.db')
with schema.connect(dbpath) as connection:
schema.init(connection)
try:
with schema.connect(dbpath) as connection:
schema.init(connection)
except Exception as e:
raise BackendError('Error initializing database: %s' % e)
self.library = PodcastLibraryProvider(dbpath, config, backend=self)
self.playback = PodcastPlaybackProvider(audio=audio, backend=self)
self.podcasts = PodcastCache(config)
# passed to PodcastIndexer.start()
self.__config = config
self.__dbpath = dbpath
# passed to PodcastUpdateActor.start()
self.__update_args = [dbpath, config, self]

def on_start(self):
self.indexer = PodcastIndexer.start(self.__dbpath, self.__config, self)
self.indexer = PodcastUpdateActor.start(*self.__update_args)

def on_stop(self):
self.indexer.stop()
21 changes: 12 additions & 9 deletions mopidy_podcast/ext.conf
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
[podcast]
enabled = true

# an optional list of podcast RSS feed URLs to subscribe to; URLs need
# to be seperated by commas or newlines
# optional list of podcast RSS feed URLs to subscribe to; URLs must be
# seperated with commas or newlines
feeds =

# optional path to directory containing OPML files for import; uses
# extension config dir if not set
import_dir =

# directory update interval in seconds
# directory/index update interval in seconds
update_interval = 86400

# sort podcast episodes by ascending (asc) or descending (desc)
# publication date when browsing
browse_order = desc

lookup_order = asc

# maximum number of search results
search_limit = 20

# number of podcasts to cache
# maximum number of podcasts to cache in memory
cache_size = 64

# cache time-to-live in seconds
cache_ttl = 3600
# cache time-to-live in seconds; should be <= update_interval
cache_ttl = 86400

# request timeout in seconds
# HTTP request/database connection timeout in seconds
timeout = 10
Loading

0 comments on commit c82d0f3

Please sign in to comment.