From 1fff6a2998bf26bb0d80e0285f4d7cc98ea2167d Mon Sep 17 00:00:00 2001 From: Allen Wittenauer Date: Sat, 8 Jul 2023 18:58:26 -0700 Subject: [PATCH] Some discogs fixes and adding wikimedia support (#879) --- nowplaying.pyproject | 1 + nowplaying/artistextras/discogs.py | 58 +- nowplaying/artistextras/wikimedia.py | 85 ++ nowplaying/config.py | 17 +- nowplaying/musicbrainz.py | 8 +- .../resources/artistextras_wikimedia_ui.ui | 168 ++++ nowplaying/settingsui.py | 25 +- nowplaying/vendor/bin.pyi | 1 + nowplaying/vendor/bin/wptool | 8 + nowplaying/vendor/wptools.pyi | 1 + nowplaying/vendor/wptools/LICENSE | 22 + nowplaying/vendor/wptools/__init__.py | 33 + nowplaying/vendor/wptools/__main__.py | 5 + nowplaying/vendor/wptools/category.py | 175 ++++ nowplaying/vendor/wptools/core.py | 320 ++++++++ nowplaying/vendor/wptools/page.py | 745 ++++++++++++++++++ nowplaying/vendor/wptools/query.py | 423 ++++++++++ nowplaying/vendor/wptools/restbase.py | 205 +++++ nowplaying/vendor/wptools/site.py | 235 ++++++ nowplaying/vendor/wptools/utils.py | 342 ++++++++ nowplaying/vendor/wptools/wikidata.py | 388 +++++++++ nowplaying/vendor/wptools/wptool.py | 225 ++++++ pyproject.toml | 1 + requirements-run.txt | 6 + vendor.txt | 1 + 25 files changed, 3449 insertions(+), 49 deletions(-) create mode 100644 nowplaying/artistextras/wikimedia.py create mode 100644 nowplaying/resources/artistextras_wikimedia_ui.ui create mode 100644 nowplaying/vendor/bin.pyi create mode 100755 nowplaying/vendor/bin/wptool create mode 100644 nowplaying/vendor/wptools.pyi create mode 100644 nowplaying/vendor/wptools/LICENSE create mode 100644 nowplaying/vendor/wptools/__init__.py create mode 100644 nowplaying/vendor/wptools/__main__.py create mode 100644 nowplaying/vendor/wptools/category.py create mode 100644 nowplaying/vendor/wptools/core.py create mode 100644 nowplaying/vendor/wptools/page.py create mode 100644 nowplaying/vendor/wptools/query.py create mode 100644 nowplaying/vendor/wptools/restbase.py create mode 100644 nowplaying/vendor/wptools/site.py create mode 100644 nowplaying/vendor/wptools/utils.py create mode 100644 nowplaying/vendor/wptools/wikidata.py create mode 100644 nowplaying/vendor/wptools/wptool.py diff --git a/nowplaying.pyproject b/nowplaying.pyproject index e8f08717..982b9c6c 100644 --- a/nowplaying.pyproject +++ b/nowplaying.pyproject @@ -5,6 +5,7 @@ "nowplaying/resources/artistextras_fanarttv_ui.ui", "nowplaying/resources/artistextras_theaudiodb_ui.ui", "nowplaying/resources/artistextras_ui.ui", + "nowplaying/resources/artistextras_wikimedia_ui.ui", "nowplaying/resources/discordbot_ui.ui", "nowplaying/resources/filter_ui.ui", "nowplaying/resources/general_ui.ui", diff --git a/nowplaying/artistextras/discogs.py b/nowplaying/artistextras/discogs.py index 56034a50..2b560115 100755 --- a/nowplaying/artistextras/discogs.py +++ b/nowplaying/artistextras/discogs.py @@ -37,36 +37,47 @@ def _setup_client(self): return True return False - def _find_discogs_artist_releaselist(self, metadata): + def _find_discogs_website(self, metadata): + ''' use websites listing to find discogs entries ''' + + artistname = metadata['artist'] + if not self.client and not self._setup_client(): + return artistname + + if not self.client or not metadata.get('artistwebsites'): + return artistname + + artistnum = 0 + discogs_websites = [url for url in metadata['artistwebsites'] if 'discogs' in url] + if len(discogs_websites) == 1: + artistnum = discogs_websites[0].split('/')[-1] + artist = self.client.artist(artistnum) + artistname = str(artist.name) + logging.debug('Found a singular discogs artist URL using %s instead of %s', artistname, + metadata['artist']) + elif len(discogs_websites) > 1: + for website in discogs_websites: + artistnum = website.split('/')[-1] + artist = self.client.artist(artistnum) + webartistname = str(artist.name) + if nowplaying.utils.normalize(webartistname) == nowplaying.utils.normalize( + metadata['artist']): + logging.debug( + 'Found near exact match discogs artist URL %s using %s instead of %s', + website, webartistname, metadata['artist']) + artistname = webartistname + break + return artistname + def _find_discogs_artist_releaselist(self, metadata): + ''' given metadata, find the releases for an artist ''' if not self.client and not self._setup_client(): return None if not self.client: return None - artistnum = 0 - artistname = metadata['artist'] - # 'https://www.discogs.com/artist/' - if metadata.get('artistwebsites'): - discogs_website = [url for url in metadata['artistwebsites'] if 'discogs' in url] - if len(discogs_website) == 1: - artistnum = discogs_website[0].split('/')[-1] - artist = self.client.artist(artistnum) - artistname = str(artist.name) - logging.debug('Found a singular discogs artist URL using %s instead of %s', - artistname, metadata['artist']) - elif len(discogs_website) > 1: - for website in discogs_website: - artistnum = website.split('/')[-1] - artist = self.client.artist(artistnum) - webartistname = str(artist.name) - if nowplaying.utils.normalize(webartistname) == nowplaying.utils.normalize( - metadata['artist']): - logging.debug( - 'Found near exact match discogs artist URL %s using %s instead of %s', - website, webartistname, metadata['artist']) - artistname = webartistname + artistname = self._find_discogs_website(metadata) try: logging.debug('Fetching %s - %s', artistname, metadata['album']) @@ -105,6 +116,7 @@ def download(self, metadata=None, imagecache=None): # pylint: disable=too-many- return None oldartist = metadata['artist'] + artistresultlist = None for variation in nowplaying.utils.artist_name_variations(metadata['artist']): metadata['artist'] = variation artistresultlist = self._find_discogs_artist_releaselist(metadata) diff --git a/nowplaying/artistextras/wikimedia.py b/nowplaying/artistextras/wikimedia.py new file mode 100644 index 00000000..a74b2c82 --- /dev/null +++ b/nowplaying/artistextras/wikimedia.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 +''' start of support of discogs ''' + +import logging +from nowplaying.vendor import wptools + +from nowplaying.artistextras import ArtistExtrasPlugin + +class Plugin(ArtistExtrasPlugin): + ''' handler for discogs ''' + + def __init__(self, config=None, qsettings=None): + super().__init__(config=config, qsettings=qsettings) + self.displayname = "Wikimedia" + + def download(self, metadata=None, imagecache=None): + ''' download content ''' + + mymeta = {} + print(metadata['artistwebsites']) + if not metadata.get('artistwebsites'): + logging.debug('No artistwebsites.') + return None + + wikidata_websites = [url for url in metadata['artistwebsites'] if 'wikidata' in url] + for website in wikidata_websites: + entity = website.split('/')[-1] + page = wptools.page(wikibase=entity, silent=True) + page.get() + + if page.data['extext'] and self.config.cparser.value('wikimedia/bio', type=bool): + mymeta['artistlongbio'] = page.data['extext'] + if page.data['claims'].get('P434'): + mymeta['musicbrainzartistid'] = page.data['claims'].get('P434') + mymeta['artistwebsites'] = [] + if page.data['claims'].get('P1953'): + mymeta['artistwebsites'].append( + f"https://discogs.com/artist/{page.data['claims'].get('P1953')[0]}") + mymeta['artistfanarturls'] = [] + thumbs = [] + if page.images(): + for image in page.images(['kind', 'url']): + if image['kind'] in ['wikidata-image', 'parse-image' + ] and self.config.cparser.value('wikimedia/fanart', + type=bool): + mymeta['artistfanarturls'].append(image['url']) + elif image['kind'] == 'query-thumbnail': + thumbs.append(image['url']) + + if thumbs and self.config.cparser.value('wikimedia/thumbnails', type=bool): + imagecache.fill_queue(config=self.config, + artist=metadata['artist'], + imagetype='artistthumb', + urllist=thumbs) + return mymeta + + def providerinfo(self): # pylint: disable=no-self-use + ''' return list of what is provided by this plug-in ''' + return ['artistlongbio', 'wikimedia-artistfanarturls', 'artistwebsites'] + + def load_settingsui(self, qwidget): + ''' draw the plugin's settings page ''' + if self.config.cparser.value('wikimedia/enabled', type=bool): + qwidget.wikimedia_checkbox.setChecked(True) + else: + qwidget.wikimedia_checkbox.setChecked(False) + + for field in ['bio', 'fanart', 'thumbnails', 'websites']: + func = getattr(qwidget, f'{field}_checkbox') + func.setChecked(self.config.cparser.value(f'wikimedia/{field}', type=bool)) + + def save_settingsui(self, qwidget): + ''' take the settings page and save it ''' + + self.config.cparser.setValue('wikimedia/enabled', qwidget.wikimedia_checkbox.isChecked()) + + for field in ['bio', 'fanart', 'thumbnails', 'websites']: + func = getattr(qwidget, f'{field}_checkbox') + self.config.cparser.setValue(f'wikimedia/{field}', func.isChecked()) + + def defaults(self, qsettings): + for field in ['bio', 'fanart', 'thumbnails', 'websites']: + qsettings.setValue(f'wikimedia/{field}', False) + + qsettings.setValue('wikimedia/enabled', False) diff --git a/nowplaying/config.py b/nowplaying/config.py index beb9389f..3da7caa2 100755 --- a/nowplaying/config.py +++ b/nowplaying/config.py @@ -243,15 +243,17 @@ def plugins_load_settingsui(self, qtwidgets): for plugintype, plugtypelist in self.plugins.items(): for key in plugtypelist: widgetkey = key.split('.')[-1] - self.pluginobjs[plugintype][key].load_settingsui( - qtwidgets[f'{plugintype}_{widgetkey}']) + if qtwidgets[f'{plugintype}_{widgetkey}']: + self.pluginobjs[plugintype][key].load_settingsui( + qtwidgets[f'{plugintype}_{widgetkey}']) def plugins_verify_settingsui(self, inputname, qtwidgets): ''' configure the defaults for plugins ''' for plugintype, plugtypelist in self.plugins.items(): for key in plugtypelist: widgetkey = key.split('.')[-1] - if (widgetkey == inputname and plugintype == 'inputs') or (plugintype != 'inputs'): + if (widgetkey == inputname and plugintype == 'inputs' + ) or (plugintype != 'inputs') and qtwidgets[f'{plugintype}_{widgetkey}']: self.pluginobjs[plugintype][key].verify_settingsui( qtwidgets[f'{plugintype}_{widgetkey}']) @@ -260,12 +262,15 @@ def plugins_save_settingsui(self, qtwidgets): for plugintype, plugtypelist in self.plugins.items(): for key in plugtypelist: widgetkey = key.split('.')[-1] - self.pluginobjs[plugintype][key].save_settingsui( - qtwidgets[f'{plugintype}_{widgetkey}']) + if qtwidgets[f'{plugintype}_{widgetkey}']: + self.pluginobjs[plugintype][key].save_settingsui( + qtwidgets[f'{plugintype}_{widgetkey}']) def plugins_description(self, plugintype, plugin, qtwidget): ''' configure the defaults for input plugins ''' - self.pluginobjs[plugintype][f'nowplaying.{plugintype}.{plugin}'].desc_settingsui(qtwidget) + if qtwidget: + self.pluginobjs[plugintype][f'nowplaying.{plugintype}.{plugin}'].desc_settingsui( + qtwidget) # pylint: disable=too-many-arguments def put(self, initialized, notif, loglevel): diff --git a/nowplaying/musicbrainz.py b/nowplaying/musicbrainz.py index cf200355..dc779663 100755 --- a/nowplaying/musicbrainz.py +++ b/nowplaying/musicbrainz.py @@ -2,6 +2,7 @@ # pylint: disable=invalid-name ''' support for musicbrainz ''' +import contextlib import logging import logging.config import logging.handlers @@ -146,13 +147,10 @@ def isrc(self, isrclist): mbdata = {} for isrc in isrclist: - try: + with contextlib.suppress(Exception): mbdata = musicbrainzngs.get_recordings_by_isrc(isrc, includes=['releases'], release_status=['official']) - except Exception: # pylint: disable=broad-except - pass - if not mbdata: for isrc in isrclist: try: @@ -331,7 +329,7 @@ def _websites(self, idlist): type=bool): sitelist.append(urlrel['target']) logging.debug('placed %s', dest) - return sitelist + return list(dict.fromkeys(sitelist)) def providerinfo(self): # pylint: disable=no-self-use ''' return list of what is provided by this recognition system ''' diff --git a/nowplaying/resources/artistextras_wikimedia_ui.ui b/nowplaying/resources/artistextras_wikimedia_ui.ui new file mode 100644 index 00000000..36d63443 --- /dev/null +++ b/nowplaying/resources/artistextras_wikimedia_ui.ui @@ -0,0 +1,168 @@ + + + wikimedia_ui + + + + 0 + 0 + 640 + 480 + + + + Wikimedia + + + + + 10 + 40 + 361 + 23 + + + + Enable Metadata by Wikimedia + + + + + + 20 + 240 + 591 + 101 + + + + Wikidata is a free, collaborative, multilingual, secondary database, collecting structured data to provide support for Wikipedia, Wikimedia Commons, the other wikis of the Wikimedia movement, and to anyone in the world. + + + true + + + + + + 40 + 120 + 541 + 80 + + + + + + + false + + + Fanart + + + + + + + false + + + Thumbnails + + + + + + + Websites + + + + + + + + false + + + + 40 + 80 + 88 + 24 + + + + Biography + + + + + + + wikimedia_checkbox + toggled(bool) + fanart_checkbox + setEnabled(bool) + + + 190 + 51 + + + 72 + 231 + + + + + wikimedia_checkbox + toggled(bool) + thumbnails_checkbox + setEnabled(bool) + + + 190 + 51 + + + 85 + 291 + + + + + wikimedia_checkbox + toggled(bool) + bio_checkbox + setEnabled(bool) + + + 190 + 51 + + + 70 + 164 + + + + + wikimedia_checkbox + toggled(bool) + websites_checkbox + setEnabled(bool) + + + 190 + 51 + + + 491 + 159 + + + + + diff --git a/nowplaying/settingsui.py b/nowplaying/settingsui.py index 929d9382..1a5424c5 100755 --- a/nowplaying/settingsui.py +++ b/nowplaying/settingsui.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 ''' user interface to configure ''' +import contextlib import logging import os import pathlib @@ -68,12 +69,9 @@ def _setup_widgets(self, uiname, displayname=None): if not self.widgets[uiname]: return - try: + with contextlib.suppress(AttributeError): qobject_connector = getattr(self, f'_connect_{uiname}_widget') qobject_connector(self.widgets[uiname]) - except AttributeError: - pass - self.qtui.settings_stack.addWidget(self.widgets[uiname]) self._load_list_item(f'{uiname}', self.widgets[uiname], displayname) @@ -323,6 +321,11 @@ def _upd_win_discordbot(self): def _upd_win_quirks(self): ''' update the quirks settings to match config ''' + def _set_quirks_modes(arg0, arg1, arg2): + self.widgets['quirks'].slash_nochange.setChecked(arg0) + self.widgets['quirks'].slash_toback.setChecked(arg1) + self.widgets['quirks'].slash_toforward.setChecked(arg2) + # file system notification method if self.config.cparser.value('quirks/pollingobserver', type=bool): self.widgets['quirks'].fs_events_button.setChecked(False) @@ -342,19 +345,11 @@ def _upd_win_quirks(self): slashmode = self.config.cparser.value('quirks/slashmode') or 'nochange' if slashmode == 'nochange': - self.widgets['quirks'].slash_nochange.setChecked(True) - self.widgets['quirks'].slash_toback.setChecked(False) - self.widgets['quirks'].slash_toforward.setChecked(False) - + _set_quirks_modes(True, False, False) elif slashmode == 'toforward': - self.widgets['quirks'].slash_nochange.setChecked(False) - self.widgets['quirks'].slash_toback.setChecked(False) - self.widgets['quirks'].slash_toforward.setChecked(True) - + _set_quirks_modes(False, False, True) elif slashmode == 'toback': - self.widgets['quirks'].slash_nochange.setChecked(False) - self.widgets['quirks'].slash_toback.setChecked(True) - self.widgets['quirks'].slash_toforward.setChecked(False) + _set_quirks_modes(False, True, False) def _upd_win_plugins(self): ''' tell config to trigger plugins to update windows ''' diff --git a/nowplaying/vendor/bin.pyi b/nowplaying/vendor/bin.pyi new file mode 100644 index 00000000..144d76ec --- /dev/null +++ b/nowplaying/vendor/bin.pyi @@ -0,0 +1 @@ +from bin import * \ No newline at end of file diff --git a/nowplaying/vendor/bin/wptool b/nowplaying/vendor/bin/wptool new file mode 100755 index 00000000..897ee25b --- /dev/null +++ b/nowplaying/vendor/bin/wptool @@ -0,0 +1,8 @@ +#!/tmp/wnp/bin/python +# -*- coding: utf-8 -*- +import re +import sys +from wptools.wptool import main +if __name__ == '__main__': + sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) + sys.exit(main()) diff --git a/nowplaying/vendor/wptools.pyi b/nowplaying/vendor/wptools.pyi new file mode 100644 index 00000000..7e33d50f --- /dev/null +++ b/nowplaying/vendor/wptools.pyi @@ -0,0 +1 @@ +from wptools import * \ No newline at end of file diff --git a/nowplaying/vendor/wptools/LICENSE b/nowplaying/vendor/wptools/LICENSE new file mode 100644 index 00000000..eb69d7d7 --- /dev/null +++ b/nowplaying/vendor/wptools/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2012-17 + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/nowplaying/vendor/wptools/__init__.py b/nowplaying/vendor/wptools/__init__.py new file mode 100644 index 00000000..03c3d975 --- /dev/null +++ b/nowplaying/vendor/wptools/__init__.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- + +""" +Wikipedia tools (for Humans) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Python and command-line MediaWiki access for Humans + +- get page extracts, image, Infobox data, Wikidata, and more +- get a random page, category, or site +- get page statistics +- get category members +- get site info and stats +- get data in any language +""" + +__author__ = "siznax" +__contact__ = "https://github.com/siznax/wptools" +__license__ = "MIT" +__title__ = "wptools" +__version__ = "0.4.17" + +from . import core +from . import query +#from . import request +from . import site +from . import utils + +from .category import WPToolsCategory as category +from .page import WPToolsPage as page +from .restbase import WPToolsRESTBase as restbase +from .site import WPToolsSite as site +from .wikidata import WPToolsWikidata as wikidata diff --git a/nowplaying/vendor/wptools/__main__.py b/nowplaying/vendor/wptools/__main__.py new file mode 100644 index 00000000..7748eeb5 --- /dev/null +++ b/nowplaying/vendor/wptools/__main__.py @@ -0,0 +1,5 @@ +from nowplaying.vendor.wptools.wptool import main, parse_args + + +if __name__ == "__main__": + main(parse_args()) diff --git a/nowplaying/vendor/wptools/category.py b/nowplaying/vendor/wptools/category.py new file mode 100644 index 00000000..90ad6db8 --- /dev/null +++ b/nowplaying/vendor/wptools/category.py @@ -0,0 +1,175 @@ +# -*- coding:utf-8 -*- + +""" +WPTools Category module +~~~~~~~~~~~~~~~~~~~~~~~ + +Support for getting Mediawiki category info. +""" + +from . import core + + +class WPToolsCategory(core.WPTools): + """ + WPToolsCategory class + """ + + def __init__(self, *args, **kwargs): + """ + Returns a WPToolsCategory object + + Gets a random category without arguments + + Optional positional {params}: + - [title]: category title + + Optional keyword {params}: + - [endpoint]: alternative API endpoint (default=/w/api.php) + - [lang]: Mediawiki language code (default=en) + - [namespace]: filter members (0=article, 14=category) + - [pageid]: category pageid + - [variant]: Mediawiki language variant + - [wiki]: alternative wiki site (default=wikipedia.org) + + Optional keyword {flags}: + - [silent]: do not echo page data if True + - [skip]: skip actions in this list + - [verbose]: verbose output to stderr if True + + See also: + https://www.mediawiki.org/wiki/Manual:Namespace + """ + super(WPToolsCategory, self).__init__(**kwargs) + + endpoint = kwargs.get('endpoint') + pageid = kwargs.get('pageid') + namespace = kwargs.get('namespace') + + title = None + if len(args) > 0: + title = args[0] + self.params.update({'title': title}) + + if endpoint: + self.params.update({'endpoint': endpoint}) + + if pageid: + try: + self.params.update({'pageid': int(pageid)}) + except ValueError: + raise ValueError("invalid pageid") + if title: + raise ValueError("cannot use both title AND pageid") + + if namespace or namespace == 0: + try: + self.params.update({'namespace': int(namespace)}) + except ValueError: + raise ValueError("invalid namespace") + + if not pageid and not title: + self.get_random() + + def _add_members(self, catmembers): + """ + Adds category members and subcategories to data + """ + members = [x for x in catmembers if x['ns'] == 0] + subcats = [x for x in catmembers if x['ns'] == 14] + + if 'members' in self.data: + self.data['members'].extend(members) + else: + self.data.update({'members': members}) + + if subcats: + if 'subcategories' in self.data: + self.data['subcategories'].extend(subcats) + else: + self.data.update({'subcategories': subcats}) + + def _query(self, action, qobj): + """ + Form query to enumerate category + """ + title = self.params.get('title') + pageid = self.params.get('pageid') + + if action == 'random': + return qobj.random(namespace=14) + elif action == 'category': + return qobj.category(title, pageid, self._continue_params()) + + def _set_data(self, action): + """ + Set category member data from API response + """ + data = self._load_response(action) + + self._handle_continuations(data, 'category') + + if action == 'category': + members = data.get('query').get('categorymembers') + if members: + self._add_members(members) + + if action == 'random': + rand = data['query']['random'][0] + data = {'pageid': rand.get('id'), + 'title': rand.get('title')} + self.data.update(data) + self.params.update(data) + + def get_members(self, show=True, proxy=None, timeout=0): + """ + GET Mediawiki:API (action=query) category members + https://www.mediawiki.org/wiki/API:Categorymembers + + Required {params}: title OR pageid + - title: article title + - pageid: Wikipedia database ID + + Optional arguments: + - [show]: echo page data if true + - [proxy]: use this HTTP proxy + - [timeout]: timeout in seconds (0=wait forever) + + Data captured: + - members: category members [{ns, pageid, title}] + """ + title = self.params.get('title') + pageid = self.params.get('pageid') + + if not title and not pageid: + raise LookupError("needs category title or pageid") + + self._get('category', show, proxy, timeout) + + while self.data.get('continue'): + self._get('category', show, proxy, timeout) + + return self + + def get_random(self, show=True, proxy=None, timeout=0): + """ + GET MediaWiki:API (action=query) for random category + https://www.mediawiki.org/wiki/API:Random + + Required {params}: None + + Optional arguments: + - [show]: echo page data if true + - [proxy]: use this HTTP proxy + - [timeout]: timeout in seconds (0=wait forever) + + Data captured: + - pageid: Wikipedia database ID + - title: article title + """ + self._get('random', show, proxy, timeout) + + # flush cache to allow repeated random requests + del self.cache['random'] + + return self diff --git a/nowplaying/vendor/wptools/core.py b/nowplaying/vendor/wptools/core.py new file mode 100644 index 00000000..f1b845bc --- /dev/null +++ b/nowplaying/vendor/wptools/core.py @@ -0,0 +1,320 @@ +# -*- coding:utf-8 -*- + +""" +WPTools core module +~~~~~~~~~~~~~~~~~~~ + +Support for accessing Wikimedia foundation APIs. +""" + +from time import sleep +import urllib.parse + +from nowplaying.vendor.wptools.query import WPToolsQuery + +from . import utils + +import requests +class WPTools(object): + """ + WPtools (abstract) core class + """ + + REQUEST_DELAY = 0 + REQUEST_LIMIT = 50 + + cache = None + data = None + flags = None + params = None + + def __init__(self, *args, **kwargs): + """ + Abstract initialization for... + - wptools.page + - wptools.category + - wptools.restbase + - wptools.wikidata + """ + self.cache = {} + self.data = {} + + self.flags = { + 'silent': kwargs.get('silent') or False, + 'verbose': kwargs.get('verbose') or False + } + + self.params = { + 'lang': kwargs.get('lang') or 'en', + } + + if len(args) > 0 and args[0]: # first positional arg is title + self.params.update({'title': args[0]}) + + if kwargs.get('skip'): + self.flags.update({'skip': kwargs.get('skip')}) + + if kwargs.get('variant'): + self.params.update({'variant': kwargs.get('variant')}) + + if kwargs.get('wiki'): + self.params.update({'wiki': kwargs.get('wiki')}) + + def _build_showstr(self, seed): + """ + Returns show() display string for data attribute + """ + output = ["%s (%s) data" % (seed, self.params['lang'])] + + output.append('{') + + maxwidth = WPToolsQuery.MAXWIDTH + + for item in sorted(self.data): + + if self.data[item] is None: + continue + + prefix = item + value = self.data[item] + + if isinstance(value, dict): + prefix = "%s: " % (prefix, len(value)) + value = ', '.join(value.keys()) + elif isinstance(value, int): + prefix = "%s:" % prefix + if 'pageid' not in prefix: + value = "{:,}".format(value) + elif isinstance(value, list): + prefix = "%s: " % (prefix, len(value)) + value = ', '.join((safestr(x) for x in value if x)) + elif isinstance(value, tuple): + prefix = "%s: " % (prefix, len(value)) + value = ', '.join((safestr(x) for x in value if x)) + elif utils.is_text(value): + value = value.strip().replace('\n', '') + if len(value) > (maxwidth - len(prefix)): + prefix = "%s: " % (prefix, len(value)) + else: + prefix = "%s:" % prefix + + output.append(" %s %s" % (prefix, value)) + + output.append('}') + + return output + + def _continue_params(self): + """ + Returns query string fragment continue parameters + """ + if not self.data.get('continue'): + return + + params = [] + for item in self.data['continue']: + params.append("&%s=%s" % (item, urllib.parse.quote_plus(self.data['continue'][item]))) + + return ''.join(params) + + def _handle_continuations(self, response, cache_key): + """ + Select continue params and clear cache or last continue params + """ + rcontinue = response.get('continue') + listen = ['blcontinue', 'cmcontinue', 'plcontinue'] + cparams = {} + + if rcontinue: + for flag in listen: + if rcontinue.get(flag): + cparams[flag] = rcontinue.get(flag) + + if cparams: + self.data['continue'] = cparams + del self.cache[cache_key] + else: # no more continuations + if 'continue' in self.data: + del self.data['continue'] + + def _get(self, action, show, proxy, timeout): + """ + make HTTP request and cache response + """ + silent = self.flags['silent'] + + if action in self.cache: + if action != 'imageinfo' and action != 'labels': + utils.stderr("+ %s results in cache" % action, silent) + return + else: + self.cache[action] = {} + + if self.flags.get('skip') and action in self.flags['skip']: + if not self.flags['silent']: + utils.stderr("+ skipping %s" % action) + return + + if 'requests' not in self.data: + self.data['requests'] = [] + + if len(self.data['requests']) >= self.REQUEST_LIMIT: + raise StopIteration("Hit REQUEST_LIMIT = %d" % self.REQUEST_LIMIT) + + if self.data['requests'] and self.REQUEST_DELAY: + utils.stderr("REQUEST_DELAY = %d seconds" % self.REQUEST_DELAY) + sleep(self.REQUEST_DELAY) + + # make the request + qobj = WPToolsQuery(lang=self.params['lang'], + variant=self.params.get('variant'), + wiki=self.params.get('wiki'), + endpoint=self.params.get('endpoint')) + qstr = self._query(action, qobj) + response = requests.get(qstr, qobj.status) + self.cache[action]['query'] = qstr + self.cache[action]['response'] = response.json() + + self.data['requests'].append(action) + + self._set_data(action) + + if show and not self.flags.get('silent'): + self.show() + + def _load_response(self, action): + """ + returns API reponse from cache or raises ValueError + """ + _query = self.cache[action]['query'].replace('&format=json', '') + response = self.cache[action]['response'] + + if not response: + raise ValueError("Empty response: %s" % self.params) + + data = response + + if data.get('warnings'): + if 'WARNINGS' in self.data: + self.data['WARNINGS'].update(data['warnings']) + else: + self.data['WARNINGS'] = data['warnings'] + + if data.get('error'): + utils.stderr("API error: %s" % data.get('error')) + raise LookupError(_query) + + if 'query' in action and data.get('query'): + if data['query'].get('pages'): + if data['query']['pages'][0].get('missing'): + raise LookupError(_query) + + if action == 'parse' and not data.get('parse'): + raise LookupError(_query) + + if action == 'wikidata': + handle_wikidata_errors(data, _query) + + return data + + def _query(self, action, qobj): + """ + Abstract method that returns WPToolsQuery string + """ + raise NotImplementedError("A subclass must implement this method.") + + def _set_data(self, action): + """ + Abstract method to capture API response data + """ + raise NotImplementedError("A subclass must implement this method.") + + def info(self, action=None): + """ + returns cached request info for given action, + or list of cached actions + """ + if action in self.cache: + return self.cache[action]['info'] + return self.cache.keys() or None + + def query(self, action=None): + """ + returns cached query string (without &format=json) for given action, + or list of cached actions + """ + if action in self.cache: + return self.cache[action]['query'].replace('&format=json', '') + return self.cache.keys() or None + + def response(self, action=None): + """ + returns cached response (as dict) for given action, + or list of cached actions + """ + if action in self.cache: + return self.cache[action]['response'] + return self.cache.keys() or None + + def show(self): + """ + Pretty-print instance data + """ + if not self.data: + return + + if self.data.get('continue'): + return + + ptitle = self.params.get('title') + dtitle = self.data.get('title') + pageid = self.params.get('pageid') + + seed = dtitle or ptitle or pageid + if utils.is_text(seed): + seed = seed.replace('_', ' ') + + prettyprint(self._build_showstr(seed)) + + +def handle_wikidata_errors(data, query): + """ + Raises LookupError if wikidata error found + """ + entities = data.get('entities') + + if not entities: + raise LookupError(query) + elif '-1' in entities: + raise LookupError(query) + else: + item = list(entities.values())[0] + if 'missing' in item: + errmsg = "wikidata item %s has been deleted" % item['id'] + raise LookupError(errmsg) + + +def prettyprint(datastr): + """ + Print page data strings to stderr + """ + maxwidth = WPToolsQuery.MAXWIDTH + rpad = WPToolsQuery.RPAD + + extent = maxwidth - (rpad + 2) + for line in datastr: + if len(line) >= maxwidth: + line = line[:extent] + '...' + utils.stderr(line) + + +def safestr(text): + """ + Safely convert unicode to a string + """ + if text is None: + return + try: + return str(text) + except UnicodeEncodeError: + return str(text.encode('utf-8')) diff --git a/nowplaying/vendor/wptools/page.py b/nowplaying/vendor/wptools/page.py new file mode 100644 index 00000000..faeef032 --- /dev/null +++ b/nowplaying/vendor/wptools/page.py @@ -0,0 +1,745 @@ +# -*- coding:utf-8 -*- + +""" +WPTools Page module +~~~~~~~~~~~~~~~~~~~ + +Support for getting Wikimedia page info. + +Access to Wikimedia APIs: + +- Mediawiki: https://www.mediawiki.org/wiki/API:Main_page +- RESTBase: https://www.mediawiki.org/wiki/RESTBase +- Wikidata: https://www.wikidata.org/wiki/Wikidata:Data_access + +See also: + +- https://www.mediawiki.org/wiki/Manual:Page_table +""" + +from bs4 import BeautifulSoup + +from . import core +from . import utils + +from .restbase import WPToolsRESTBase +from .wikidata import WPToolsWikidata + + +class WPToolsPage(WPToolsRESTBase, + WPToolsWikidata, + core.WPTools): + """ + WPtools Page class, derived from wptools.core + """ + + def __init__(self, *args, **kwargs): + """ + Returns a WPToolsPage object + + Gets a random title without arguments + + Optional positional {params}: + - [title]: Mediawiki page title, file, category, etc. + + Optional keyword {params}: + - [boxterm]: Infobox title name or substring + - [endpoint]: alternative API endpoint (default=/w/api.php) + - [lang]: Mediawiki language code (default=en) + - [pageid]: Mediawiki pageid + - [variant]: Mediawiki language variant + - [wiki]: alternative wiki site (default=wikipedia.org) + - [wikibase]: Wikidata database ID (e.g. 'Q1') + + Optional keyword {flags}: + - [silent]: do not echo page data if True + - [skip]: skip actions in this list + - [verbose]: verbose output to stderr if True + """ + super(WPToolsPage, self).__init__(*args, **kwargs) + + title = self.params.get('title') + + boxterm = kwargs.get('boxterm') + if boxterm: + self.params.update({'boxterm': boxterm}) + + endpoint = kwargs.get('endpoint') + if endpoint: + self.params.update({'endpoint': endpoint}) + + pageid = kwargs.get('pageid') + if pageid: + self.params.update({'pageid': pageid}) + + wikibase = kwargs.get('wikibase') + if wikibase: + self.params.update({'wikibase': wikibase}) + + if not title and not pageid and not wikibase: + self.get_random() + else: + self.show() + + def __insert_image_info(self, title, _from, info): + """ + Insert API image INFO into matching image dict + + We make one imageinfo request containing only unique image + filenames. We reduce duplication by asking for image data per + file, instead of per "kind" or source (Wikipedia, Wikidata, + etc.), because some sources reference the same image file. We + match API imageinfo response data to existing image filenames + by API title or normalized "from" title. So, some imageinfo + data will be applied to more than one image "kind" (source) if + they share the same filename. + """ + for img in self.data['image']: + if 'url' not in img: + if title == img['file']: # matching title/file + img.update(info) + elif _from == img['file']: # matching from/file + img.update(info) + + def __pull_image_info(self, title, imageinfo, normalized): + """ + Pull image INFO from API response and insert + """ + for info in imageinfo: + info.update({'title': title}) + + # get API normalized "from" filename for matching + _from = None + for norm in normalized: + if title == norm['to']: + _from = norm['from'] + + # let's put all "metadata" in one member + info['metadata'] = {} + extmetadata = info.get('extmetadata') + if extmetadata: + info['metadata'].update(extmetadata) + del info['extmetadata'] + + self.__insert_image_info(title, _from, info) + + def _extend_data(self, datapoint, new_data): + """ + extend or assign new data to datapoint + """ + if new_data: + try: + self.data[datapoint].extend(new_data) + except KeyError: + self.data[datapoint] = new_data + + def _missing_imageinfo(self): + """ + returns list of image filenames that are missing info + """ + if 'image' not in self.data: + return + missing = [] + for img in self.data['image']: + if 'url' not in img: + missing.append(img['file']) + return list(set(missing)) + + def _normalize_images(self): + """ + normalizes image filenames by prepending 'File:' if needed + """ + if 'image' not in self.data: + return + for img in self.data['image']: + fname = img['file'].replace('_', ' ') + fstart = fname.startswith('File:') + istart = fname.startswith('Image:') + if not fstart and not istart: + fname = 'File:' + fname + img['orig'] = img['file'] + img['file'] = fname + + def _query(self, action, qobj): + """ + returns WPToolsQuery string + """ + title = self.params.get('title') + pageid = self.params.get('pageid') + wikibase = self.params.get('wikibase') + + qstr = None + + if action == 'random': + qstr = qobj.random() + elif action == 'query': + qstr = qobj.query(title, pageid, self._continue_params()) + elif action == 'querymore': + qstr = qobj.querymore(title, pageid, self._continue_params()) + elif action == 'parse': + qstr = qobj.parse(title, pageid) + elif action == 'imageinfo': + qstr = qobj.imageinfo(self._missing_imageinfo()) + elif action == 'labels': + qstr = qobj.labels(self._pop_entities()) + elif action == 'wikidata': + qstr = qobj.wikidata(title, wikibase) + elif action == 'restbase': + qstr = qobj.restbase(self.params.get('rest_endpoint'), title) + + if qstr is None: + raise ValueError("Unknown action: %s" % action) + + return qstr + + def _set_data(self, action): + """ + marshals response data into page data + """ + if 'query' in action: + self._set_query_data(action) + elif action == 'imageinfo': + self._set_imageinfo_data() + elif action == 'parse': + self._set_parse_data() + elif action == 'random': + self._set_random_data() + elif action == 'labels': + self._set_labels() + elif action == 'wikidata': + self._set_wikidata() + self.get_labels() + elif action == 'restbase': + self._set_restbase_data() + + self._update_imageinfo() + self._update_params() + + def _set_imageinfo_data(self): + """ + set image attributes from MediaWiki API:Imageinfo response + """ + data = self._load_response('imageinfo') + pages = data['query'].get('pages') + + normalized = [] + if 'normalized' in data['query']: + normalized = data['query']['normalized'] + + for page in pages: + title = page.get('title') + imageinfo = page.get('imageinfo') + if imageinfo: + self.__pull_image_info(title, imageinfo, normalized) + + # Mark missing imageinfo to prevent duplicate requests + for img in self.data['image']: + if 'url' not in img: + img['url'] = 'MISSED' + + def _set_parse_data(self): + """ + set attributes derived from MediaWiki (action=parse) + """ + pdata = self._load_response('parse')['parse'] + + self.data['iwlinks'] = utils.get_links(pdata.get('iwlinks')) + self.data['pageid'] = pdata.get('pageid') + self.data['wikitext'] = pdata.get('wikitext') + + parsetree = pdata.get('parsetree') + self.data['parsetree'] = parsetree + + boxterm = self.params.get('boxterm') + if boxterm: + infobox = utils.get_infobox(parsetree, boxterm) + else: + infobox = utils.get_infobox(parsetree) + self.data['infobox'] = infobox + + title = pdata.get('title') + if title: + self.data['title'] = title + if not self.params.get('title'): + self.params['title'] = title + + wikibase = pdata.get('properties').get('wikibase_item') + if wikibase: + self.data['wikibase'] = wikibase + self.data['wikidata_url'] = utils.wikidata_url(wikibase) + + if self.data['infobox']: + self._set_parse_image(self.data['infobox']) + + def _set_parse_image(self, infobox): + """ + set image data from action=parse response + """ + image = infobox.get('image') + cover = infobox.get('Cover') or infobox.get('cover') + + if image or cover: + if 'image' not in self.data: + self.data['image'] = [] + + if image and utils.isfilename(image): + self.data['image'].append({'kind': 'parse-image', 'file': image}) + + if cover and utils.isfilename(cover): + self.data['image'].append({'kind': 'parse-cover', 'file': cover}) + + def _set_query_data(self, action='query'): + """ + set attributes derived from MediaWiki (action=query) + """ + data = self._load_response(action) + page = data['query']['pages'][0] + + self._handle_continuations(data, action) + + if action == 'query': + self.data['random'] = data['query']['random'][0]["title"] + + self._extend_data('backlinks', data['query'].get('backlinks')) + + self.data['redirected'] = data['query'].get('redirects') + + self._set_query_data_fast_1(page) # avoid pylint too-many-branches + self._set_query_data_fast_2(page) + self._set_query_data_slow(page) + + def _set_query_data_fast_1(self, page): + """ + set less expensive action=query response data PART 1 + """ + self.data['pageid'] = page.get('pageid') + + assessments = page.get('pageassessments') + if assessments: + self.data['assessments'] = assessments + + extract = page.get('extract') + if extract: + self.data['extract'] = extract + soup = BeautifulSoup(extract, 'html.parser') + extext = soup.get_text() + if extext: + self.data['extext'] = extext.strip() + + fullurl = page.get('fullurl') + if fullurl: + self.data['url'] = fullurl + self.data['url_raw'] = fullurl + '?action=raw' + + length = page.get('length') + if length: + self.data['length'] = length + + self._extend_data('links', utils.get_links(page.get('links'))) + + self._update_data('modified', 'page', page.get('touched')) + + pageprops = page.get('pageprops') + if pageprops: + wikibase = pageprops.get('wikibase_item') + if wikibase: + self.data['wikibase'] = wikibase + self.data['wikidata_url'] = utils.wikidata_url(wikibase) + + if 'disambiguation' in pageprops: + self.data['disambiguation'] = len(self.data['links']) + + def _set_query_data_fast_2(self, page): + """ + set less expensive action=query response data PART 2 + """ + self.data['pageid'] = page.get('pageid') + + redirects = page.get('redirects') + if redirects: + self.data['redirects'] = redirects + + terms = page.get('terms') + if terms: + if terms.get('alias'): + self.data['aliases'] = terms['alias'] + + if terms.get('description'): + self.data['description'] = next(iter(terms['description']), + None) + if terms.get('label'): + self.data['label'] = next(iter(terms['label']), None) + + title = page.get('title') + self.data['title'] = title + if not self.params.get('title'): + self.params['title'] = title + + watchers = page.get('watchers') + if watchers: + self.data['watchers'] = watchers + + self._set_query_image(page) + + def _set_query_data_slow(self, page): + """ + set more expensive action=query response data + """ + categories = page.get('categories') + if categories: + self.data['categories'] = [x['title'] for x in categories] + + if page.get('contributors'): + contributors = page.get('contributors') or 0 + anoncontributors = page.get('anoncontributors') or 0 + if isinstance(contributors, list): + contributors = len(contributors) + self.data['contributors'] = contributors + anoncontributors + + files = page.get('images') # really, these are FILES + if files: + self.data['files'] = [x['title'] for x in files] + + languages = page.get('langlinks') + if languages: + self.data['languages'] = languages + + pageviews = page.get('pageviews') + if pageviews: + values = [x for x in pageviews.values() if x] + if values: + self.data['views'] = int(sum(values) / len(values)) + else: + self.data['views'] = 0 + + def _set_query_image(self, page): + """ + set image data from action=query response + """ + pageimage = page.get('pageimage') + thumbnail = page.get('thumbnail') + + if pageimage or thumbnail: + if 'image' not in self.data: + self.data['image'] = [] + + if pageimage: + self.data['image'].append({ + 'kind': 'query-pageimage', + 'file': pageimage}) + + if thumbnail: + qthumb = {'kind': 'query-thumbnail'} + qthumb.update(thumbnail) + qthumb['url'] = thumbnail.get('source') + del qthumb['source'] + qthumb['file'] = qthumb['url'].split('/')[-2] + self.data['image'].append(qthumb) + + def _set_random_data(self): + """ + sets page data from random request + """ + rdata = self._load_response('random') + rdata = rdata['query']['random'][0] + + pageid = rdata.get('id') + title = rdata.get('title') + + self.data.update({'pageid': pageid, + 'title': title}) + + def _update_data(self, datapoint, key, new_data): + """ + update or assign new data to datapoint + """ + if new_data: + try: + self.data[datapoint].update({key: new_data}) + except KeyError: + self.data[datapoint] = {key: new_data} + + def _update_imageinfo(self): + """ + calls get_imageinfo() if data image missing info + """ + missing = self._missing_imageinfo() + deferred = self.flags.get('defer_imageinfo') + continuing = self.data.get('continue') + + if missing and not deferred and not continuing: + self.get_imageinfo(show=False) + + def _update_params(self): + """ + update params from response data + """ + if self.data.get('title'): + self.params['title'] = self.data.get('title') + if self.data.get('pageid'): + self.params['pageid'] = self.data.get('pageid') + if self.data.get('wikibase'): + self.params['wikibase'] = self.data.get('wikibase') + + def skip_action(self, action): + """ + append action to skip flag + """ + if 'skip' not in self.flags: + self.flags['skip'] = [] + self.flags['skip'].append(action) + + def get(self, show=True, proxy=None, timeout=0): + """ + Make Mediawiki, RESTBase, and Wikidata requests for page data + some sequence of: + - get_parse() + - get_query() + - get_restbase() + - get_wikidata() + """ + wikibase = self.params.get('wikibase') + + if wikibase: + + self.flags['defer_imageinfo'] = True + + self.get_wikidata(False, proxy, timeout) + self.get_query(False, proxy, timeout) + self.get_parse(False, proxy, timeout) + + self.flags['defer_imageinfo'] = False + + self.get_restbase('/page/summary/', False, proxy, timeout) + + if show and not self.flags.get('silent'): + self.show() + + else: + + self.flags['defer_imageinfo'] = True + + self.get_query(False, proxy, timeout) + self.get_parse(False, proxy, timeout) + + if not self.data.get('wikibase'): + self.skip_action('wikidata') + + self.get_wikidata(False, proxy, timeout) + + self.flags['defer_imageinfo'] = False + + wiki = self.params.get('wiki') + if wiki and 'wikipedia.org' not in wiki: + self.skip_action('restbase') + + self.get_restbase('/page/summary/', False, proxy, timeout) + + if show and not self.flags.get('silent'): + self.show() + + return self + + def get_imageinfo(self, show=True, proxy=None, timeout=0): + """ + GET MediaWiki request for API:Imageinfo + https://www.mediawiki.org/wiki/API:Imageinfo + + Required {data}: + - image: member () with 'file' and not 'url' + + Optional arguments: + - [show]: echo page data if true + - [proxy]: use this HTTP proxy + - [timeout]: timeout in seconds (0=wait forever) + + Data captured: + - image: member () image URLs, sizes, etc. + """ + if not self.data.get('image'): + raise ValueError("get_imageinfo needs a page image") + + if not self._missing_imageinfo() and 'imageinfo' in self.cache: + utils.stderr("complete imageinfo in cache", self.flags['silent']) + return + + self._normalize_images() + self._get('imageinfo', show, proxy, timeout) + + return self + + def get_more(self, show=True, proxy=None, timeout=0): + """ + Calls get_querymore() Is for convenience. You like. + """ + return self.get_querymore(show, proxy, timeout) + + def get_parse(self, show=True, proxy=None, timeout=0): + """ + GET MediaWiki:API action=parse request + https://en.wikipedia.org/w/api.php?action=help&modules=parse + + Required {params}: title OR pageid + - title: article title + - pageid: Wikipedia database ID + + Optional arguments: + - [show]: echo page data if true + - [proxy]: use this HTTP proxy + - [timeout]: timeout in seconds (0=wait forever) + + Data captured: + - image: {parse-image, parse-cover} + - infobox: Infobox data as python dictionary + - iwlinks: interwiki links + - pageid: Wikipedia database ID + - parsetree: XML parse tree + - requests: list of request actions made + - wikibase: Wikidata entity ID or wikidata URL + - wikitext: raw wikitext URL + """ + if not self.params.get('title') and not self.params.get('pageid'): + raise ValueError("get_parse needs title or pageid") + + self._get('parse', show, proxy, timeout) + + return self + + def get_query(self, show=True, proxy=None, timeout=0): + """ + GET MediaWiki:API action=query selected data + https://en.wikipedia.org/w/api.php?action=help&modules=query + + Required {params}: title OR pageid + - title: article title + - pageid: Wikipedia database ID + + Optional arguments: + - [show]: echo page data if true + - [proxy]: use this HTTP proxy + - [timeout]: timeout in seconds (0=wait forever) + + Data captured: + - description: Wikidata description (via pageterms) + - extext: plain text (Markdown) extract + - extract: HTML extract from Extension:TextExtract + - image: {query-pageimage, query-thumbnail} + - label: Wikidata label (via pageterms) + - modified (page): ISO8601 date and time + - pageid: Wikipedia database ID + - random: a random article title with every request! + - requests: list of request actions made + - url: the canonical wiki URL + - url_raw: ostensible raw wikitext URL + - watchers: number of people watching this page + """ + if not self.params.get('title') and not self.params.get('pageid'): + raise ValueError("get_query needs title or pageid") + + self._get('query', show, proxy, timeout) + + while self.data.get('continue'): + self._get('query', show, proxy, timeout) + + return self + + def get_querymore(self, show=True, proxy=None, timeout=0): + """ + GET MediaWiki:API action=query for MORE data + A much more expensive (slower!) query for popular pages + https://en.wikipedia.org/w/api.php?action=help&modules=query + + Required {params}: title OR pageid + - title: article title + - pageid: Wikipedia database ID + + Optional arguments: + - [show]: echo page data if true + - [proxy]: use this HTTP proxy + - [timeout]: timeout in seconds (0=wait forever) + + Data captured: + - categories: list of categories used on the page + - contributors: total number of contributors + - files: list of files contained in the page + - languages: number of wiki languages having this page + - views: average daily page views over past 60 days + """ + if not self.params['title'] and not self.params['pageid']: + raise ValueError("get_query needs title or pageid") + + self._get('querymore', show, proxy, timeout) + + while self.data.get('continue'): + self._get('querymore', show, proxy, timeout) + + return self + + def get_random(self, show=True, proxy=None, timeout=0): + """ + GET MediaWiki:API (action=query) list=random + https://www.mediawiki.org/wiki/API:Random + + Required {params}: None + + Optional arguments: + - [show]: echo page data if true + - [proxy]: use this HTTP proxy + - [timeout]: timeout in seconds (0=wait forever) + + Data captured: + - pageid: Wikipedia database ID + - title: article title + """ + self._get('random', show, proxy, timeout) + + # flush cache to allow repeated random requests + del self.cache['random'] + + return self + + def images(self, fields=None, token=None): + """ + Returns image info keys for kind containing token + + Args: + - fields: image info values wanted + - token: substring to match image kind + + EXAMPLES + + Get all available image info: + >>> page.images() + + Get all image kinds: + >>> page.images('kind') + + Get only "query" (kind) image info + >>> page.images(token='query') + + Get only file, url fields from "wikidata" images + >>> page.images(['file', 'url'], token='wikidata') + """ + + if 'image' not in self.data: + return + + out = [] + for img in self.data['image']: + if token and token not in img['kind']: + continue + info = {} + for key in img: + if fields and key not in fields: + continue + info.update({key: img[key]}) + if info: + out.append(info) + + return out + + def pageimage(self, token=None): + """ + DEPRECATED see page.images() + """ + return self.images(token=token) diff --git a/nowplaying/vendor/wptools/query.py b/nowplaying/vendor/wptools/query.py new file mode 100644 index 00000000..fea0466e --- /dev/null +++ b/nowplaying/vendor/wptools/query.py @@ -0,0 +1,423 @@ +# -*- coding:utf-8 -*- + +""" +WPTools Query module +~~~~~~~~~~~~~~~~~~~~ + +Support for forming WMF API query strings. + +* Mediawiki: https://www.mediawiki.org/wiki/API:Main_page +* Wikidata: https://www.wikidata.org/wiki/Wikidata:Data_access +* RESTBase: https://www.mediawiki.org/wiki/RESTBase + +See also: + +* WMF: https://wikimediafoundation.org/wiki/Our_projects +""" + +try: # python2 + from urllib import quote, unquote +except ImportError: # python3 + from urllib.parse import quote, unquote + +from string import Template + +import random + + +class WPToolsQuery(object): + """ + WPToolsQuery class + """ + + DEFAULT_ENDPOINT = '/w/api.php' + MAXWIDTH = 72 + RPAD = 4 + + IMAGEINFO = Template(( + "${WIKI}${ENDPOINT}?action=query" + "&format=json" + "&formatversion=2" + "&iiprop=size|url|timestamp|extmetadata" + "&prop=imageinfo" + "&titles=${FILES}")) + + LIST = Template(( + "${WIKI}${ENDPOINT}?action=query" + "&format=json" + "&formatversion=2" + "&list=${LIST}")) + + PARSE = Template(( + "${WIKI}${ENDPOINT}?action=parse" + "&format=json" + "&formatversion=2" + "&contentmodel=text" + "&disableeditsection=" + "&disablelimitreport=" + "&disabletoc=" + "&prop=text|iwlinks|parsetree|wikitext|displaytitle|properties" + "&redirects" + "&page=${PAGE}")) + + QUERY = Template(( + "${WIKI}${ENDPOINT}?action=query" + "&exintro" + "&format=json" + "&formatversion=2" + "&inprop=url|watchers" + "&list=random" + "&pithumbsize=240" + "&pllimit=500" + "&ppprop=disambiguation|wikibase_item" + "&prop=extracts|info|links|pageassessments|pageimages|pageprops" + "|pageterms|redirects" + "&redirects" + "&rdlimit=500" + "&rnlimit=1" + "&rnnamespace=0" + "&titles=${TITLES}")) + + QUERYMORE = Template(( + "${WIKI}${ENDPOINT}?action=query" + "&bllimit=500" + "&bltitle=${TITLES}" + "&cllimit=500" + "&clshow=!hidden" + "&format=json" + "&formatversion=2" + "&imlimit=500" + "&list=backlinks" + "&lllimit=500" + "&pclimit=500" + "&prop=categories|contributors|images|langlinks|pageviews" + "&redirects" + "&titles=${TITLES}")) + + WIKIDATA = Template(( + "${WIKI}${ENDPOINT}?action=wbgetentities" + "&format=json" + "&formatversion=2" + "&languages=${LANG}" + "&props=${PROPS}" + "&redirects=yes")) + + endpoint = None + lang = None + status = None + variant = None + wiki = None + + def __init__(self, lang='en', variant=None, wiki=None, endpoint=None): + """ + Returns a WPToolsQuery object + + Arguments: + - [lang=en]: Mediawiki language code + - [variant=None]: language variant + - [wiki=None]: alternative wiki site + - [endpoint=None]: alternative API endoint + """ + self.lang = lang + self.variant = variant + + self.wiki = wiki or "%s.wikipedia.org" % self.lang + self.domain = domain_name(self.wiki) + self.endpoint = endpoint or self.DEFAULT_ENDPOINT + self.uri = self.wiki_uri(self.wiki) + + def category(self, title, pageid=None, cparams=None, namespace=None): + """ + Returns category query string + """ + query = self.LIST.substitute( + WIKI=self.uri, + ENDPOINT=self.endpoint, + LIST='categorymembers') + status = pageid or title + + query += "&cmlimit=500" + + if namespace is not None: + query += "&cmnamespace=%d" % namespace + + if title and pageid: + title = None + + if title: + query += "&cmtitle=" + safequote(title) + + if pageid: + query += "&cmpageid=%d" % pageid + + if cparams: + query += cparams + status += ' (%s)' % cparams + + self.set_status('categorymembers', status) + + return query + + def labels(self, qids): + """ + Returns Wikidata labels query string + """ + if len(qids) > 50: + raise ValueError("The limit is 50.") + + self.domain = 'www.wikidata.org' + self.uri = self.wiki_uri(self.domain) + + query = self.WIKIDATA.substitute( + WIKI=self.uri, + ENDPOINT=self.endpoint, + LANG=self.variant or self.lang, + PROPS='labels') + + qids = '|'.join(qids) + query += "&ids=%s" % qids + + self.set_status('labels', qids) + + return query + + def imageinfo(self, files): + """ + Returns imageinfo query string + """ + files = '|'.join([safequote(x) for x in files]) + + self.set_status('imageinfo', files) + + return self.IMAGEINFO.substitute( + WIKI=self.uri, + ENDPOINT=self.endpoint, + FILES=files) + + def parse(self, title, pageid=None): + """ + Returns Mediawiki action=parse query string + """ + qry = self.PARSE.substitute( + WIKI=self.uri, + ENDPOINT=self.endpoint, + PAGE=safequote(title) or pageid) + + if pageid and not title: + qry = qry.replace('&page=', '&pageid=').replace('&redirects', '') + + if self.variant: + qry += '&variant=' + self.variant + + self.set_status('parse', pageid or title) + + return qry + + def query(self, titles, pageids=None, cparams=None): + """ + Returns MediaWiki action=query query string + """ + query = self.QUERY.substitute( + WIKI=self.uri, + ENDPOINT=self.endpoint, + TITLES=safequote(titles) or pageids) + status = titles or pageids + + if pageids and not titles: + query = query.replace('&titles=', '&pageids=') + + if cparams: + query += cparams + status += " (%s)" % cparams + + if self.variant: + query += '&variant=' + self.variant + + self.set_status('query', status) + + return query + + def querymore(self, titles, pageids=None, cparams=None): + """ + Returns MediaWiki action=query query string (for MORE) + A much more expensive query for popular pages + """ + query = self.QUERYMORE.substitute( + WIKI=self.uri, + ENDPOINT=self.endpoint, + TITLES=safequote(titles) or pageids) + + status = "%s" % (pageids or titles) + + if pageids and not titles: + query = query.replace('&titles=', '&pageids=') + + if cparams: + query += cparams + status += " (%s)" % cparams + + if self.variant: + query += '&variant=' + self.variant + + self.set_status('querymore', status) + + return query + + def random(self, namespace=0): + """ + Returns query string for random page + """ + query = self.LIST.substitute( + WIKI=self.uri, + ENDPOINT=self.endpoint, + LIST='random') + query += "&rnlimit=1&rnnamespace=%d" % namespace + + emoji = [ + u'\U0001f32f', # burrito or wrap + u'\U0001f355', # slice of pizza + u'\U0001f35c', # steaming bowl of ramen + u'\U0001f363', # sushi + u'\U0001f369', # doughnut + u'\U0001f36a', # cookie + u'\U0001f36d', # lollipop + u'\U0001f370', # strawberry shortcake + ] + + action = 'random' + if namespace: + action = 'random:%d' % namespace + + self.set_status(action, random.choice(emoji)) + + return query + + def restbase(self, endpoint, title): + """ + Returns RESTBase query string + """ + if not endpoint: + raise ValueError("invalid endpoint: %s" % endpoint) + + route = endpoint + if title and endpoint != '/page/': + route = endpoint + safequote_restbase(title) + + self.set_status('restbase', route) + + return "%s/api/rest_v1/%s" % (self.uri, route[1:]) + + def set_status(self, action, target): + """ + Sets query status with format: "{domain} ({action}) {target}" + """ + try: + target = unquote(target) + except (AttributeError, TypeError): + pass + + status = "%s (%s) %s" % (self.domain, action, target) + status = status.strip().replace('\n', '') + + if len(status) >= self.MAXWIDTH: + tail = '...' + extent = self.MAXWIDTH - (len(tail) + self.RPAD) + self.status = status[:extent] + tail + else: + self.status = status + + def site(self, action): + """ + Returns site query + """ + query = None + viewdays = 7 + hostpath = self.uri + self.endpoint + + if action == 'siteinfo': + query = hostpath + ( + '?action=query' + '&meta=siteinfo|siteviews' + '&siprop=general|statistics' + '&list=mostviewed&pvimlimit=max') + query += '&pvisdays=%d' % viewdays # meta=siteviews + self.set_status('query', 'siteinfo|siteviews|mostviewed') + elif action == 'sitematrix': + query = hostpath + '?action=sitematrix' + self.set_status('sitematrix', 'all') + elif action == 'sitevisitors': + query = hostpath + ( + '?action=query' + '&meta=siteviews&pvismetric=uniques') + query += '&pvisdays=%d' % viewdays # meta=siteviews + self.set_status('query', 'siteviews:uniques') + + if not query: + raise ValueError("Could not form query") + + query += '&format=json&formatversion=2' + + return query + + def wiki_uri(self, wiki): + """ + Returns scheme://domain from wiki name + """ + if wiki.startswith('http'): + return wiki + return "https://" + self.domain + + def wikidata(self, title, wikibase=None): + """ + Returns Wikidata query string + """ + self.domain = 'www.wikidata.org' + self.uri = self.wiki_uri(self.domain) + + query = self.WIKIDATA.substitute( + WIKI=self.uri, + ENDPOINT=self.endpoint, + LANG=self.variant or self.lang, + PROPS="aliases|info|claims|descriptions|labels|sitelinks") + + if wikibase: + query += "&ids=%s" % wikibase + elif title: + title = safequote(title) + query += "&sites=%swiki" % self.lang + query += "&titles=%s" % title + + self.set_status('wikidata', wikibase or title) + + return query + + +def domain_name(wiki): + """ + Returns domain name from wiki name + """ + if '//' in wiki: + wiki = wiki.split('//')[1] + return wiki.split('/')[0] + + +def safequote(string): + """ + Try to UTF-8 encode and percent-quote string + """ + if string is None: + return + try: + return quote(string.encode('utf-8')) + except UnicodeDecodeError: + return quote(string) + + +def safequote_restbase(title): + """ + Safequote restbase title possibly having slash in title + """ + try: + return quote(title.encode('utf-8'), safe='') + except UnicodeDecodeError: + return quote(title, safe='') diff --git a/nowplaying/vendor/wptools/restbase.py b/nowplaying/vendor/wptools/restbase.py new file mode 100644 index 00000000..f3cd4d98 --- /dev/null +++ b/nowplaying/vendor/wptools/restbase.py @@ -0,0 +1,205 @@ +# -*- coding:utf-8 -*- + +""" +WPTools RESTBase module +~~~~~~~~~~~~~~~~~~~~~~~ + +Support for getting RESTBase page info. +""" + +try: # python2 + from urlparse import urlparse +except ImportError: # python3 + from urllib.parse import urlparse + +from . import core +from . import utils + + +class WPToolsRESTBase(core.WPTools): + """ + WPtoolsRESTBase class + """ + + def __init__(self, *args, **kwargs): + """ + Returns a WPToolsRESTBase object + + Optional positional {params}: + - [title]: Mediawiki page title, file, category, etc. + + Optional keyword {params}: + - [lang]: Mediawiki language code (default=en) + + Optional keyword {flags}: + - [silent]: do not echo page data if True + - [skip]: skip actions in this list + - [verbose]: verbose output to stderr if True + """ + super(WPToolsRESTBase, self).__init__(*args, **kwargs) + + def _handle_response(self): + """ + returns RESTBase response if appropriate + """ + if not self.cache['restbase'].get('info'): + return + + content = self.cache['restbase']['info']['content-type'] + if content.startswith('text/html'): + html = self.cache['restbase']['response'] + if isinstance(html, bytes): + html = html.decode('utf-8') + self.data['html'] = html + return + + response = self._load_response('restbase') + + http_status = self.cache['restbase']['info']['status'] + if http_status == 404: + raise LookupError(self.cache['restbase']['query']) + + if self.params.get('endpoint') == '/page/': + msg = "RESTBase /page/ entry points: %s" % response.get('items') + utils.stderr(msg) + del self.cache['restbase'] + return + + return response + + def _query(self, action, qobj): + """ + returns WPToolsQuery string from action + """ + return qobj.restbase(self.params['rest_endpoint'], + self.params.get('title')) + + def _set_data(self, action): + """ + Sets RESTBase response data + """ + self._set_restbase_data() + + def _set_restbase_data(self): + res = self._handle_response() + if res is None: + return + + self.data['description'] = res.get('description') + self.data['pageid'] = (res.get('id') or res.get('pageid')) + self.data['exrest'] = res.get('extract') + self.data['exhtml'] = res.get('extract_html') + + lastmodified = res.get('lastmodified') + if lastmodified: + pagemod = {'page': lastmodified} + if 'modified' in self.data: + self.data['modified'].update(pagemod) + else: + self.data['modified'] = pagemod + + if res.get('sections'): + lead = res.get('sections')[0] + self.data['lead'] = lead.get('text') + + title = res.get('title') or res.get('normalizedtitle') + if title: + self.data['title'] = title.replace(' ', '_') + + wikibase = res.get('wikibase_item') + if wikibase: + self.data['wikibase'] = wikibase + self.data['wikidata_url'] = utils.wikidata_url(wikibase) + + url = urlparse(self.cache['restbase']['query']) + durl = "%s://%s/wiki/%s" % (url.scheme, + url.netloc, + self.params['title']) + self.data['url'] = durl + self.data['url_raw'] = durl + '?action=raw' + + self._unpack_images(res) + + def _unpack_images(self, rdata): + """ + Set image data from RESTBase response + """ + image = rdata.get('image') # /page/mobile-sections-lead + originalimage = rdata.get('originalimage') # /page/summary + thumbnail = rdata.get('thumbnail') # /page/summary + + if image or originalimage or thumbnail: + if 'image' not in self.data: + self.data['image'] = [] + + def file_url(info): + """ + put image source in url and set file key + """ + if 'source' in info: + info['url'] = info['source'] + info['file'] = info['source'].split('/')[-1] + del info['source'] + return info + + if image: + img = {'kind': 'restbase-image'} + img.update(image) + self.data['image'].append(file_url(img)) + + if originalimage: + img = {'kind': 'restbase-original'} + img.update(originalimage) + self.data['image'].append(file_url(img)) + + if thumbnail: + img = {'kind': 'restbase-thumb'} + img.update(thumbnail) + self.data['image'].append(file_url(img)) + + def get_restbase(self, endpoint='/page/', + show=True, proxy=None, timeout=0): + """ + GET RESTBase /page/ endpoints needing only {title} + https://en.wikipedia.org/api/rest_v1/ + + for example: + /page/ + /page/html/{title} + /page/summary/{title} + /page/mobile-sections-lead/{title} + + Required {params}: None + Without arguments, lists RESTBase /page/ entry points + + Optional {params}: + - [title]: Mediawiki page title, file, category, etc. + - [lang]: Mediawiki language code (default=en) + + Optional arguments: + - [endpoint]: RESTBase entry point (default=/page/) + - [show]: echo page data if true + - [proxy]: use this HTTP proxy + - [timeout]: timeout in seconds (0=wait forever) + + Data captured: + - exhtml: "extract_html" from /page/summary + - exrest: "extract" from /page/summary + - html: from /page/html + - image: {rest-image, rest-thumb} + - lead: section[0] from /page/mobile-sections-lead + - modified (page): ISO8601 date and time + - title: the article title + - url: the canonical wiki URL + - url_raw: probable raw wikitext URL + - wikibase: Wikidata item ID + - wikidata_url: Wikidata URL + """ + if endpoint != '/page/' and not self.params.get('title'): + raise StandardError("endpoint %s needs a title" % endpoint) + + self.params.update({'rest_endpoint': endpoint}) + + self._get('restbase', show, proxy, timeout) + + return self diff --git a/nowplaying/vendor/wptools/site.py b/nowplaying/vendor/wptools/site.py new file mode 100644 index 00000000..a6fa708d --- /dev/null +++ b/nowplaying/vendor/wptools/site.py @@ -0,0 +1,235 @@ +# -*- coding:utf-8 -*- + +""" +WPTools Site module +~~~~~~~~~~~~~~~~~~~ + +Support for getting Mediawiki site info. +""" + +from __future__ import print_function + +import random + +from . import core + + +class WPToolsSite(core.WPTools): + """ + WPToolsSite class + """ + + COMMONS = 'commons.wikimedia.org' + + def __init__(self, *args, **kwargs): + """ + Returns a WPToolsSite object. + + Optional keyword {params}: + - [endpoint]: alternative API endpoint (default=/w/api.php) + - [lang]: Mediawiki language code (default=en) + - [wiki]: alternative wiki site (default=wikipedia.org) + + Optional keyword {flags}: + - [silent]: do not echo page data if True + - [skip]: skip actions in this list + - [verbose]: verbose output to stderr if True + """ + super(WPToolsSite, self).__init__(*args, **kwargs) + + endpoint = kwargs.get('endpoint') + if endpoint: + self.params.update({'endpoint': endpoint}) + + def _query(self, action, qobj): + """ + returns query string + """ + return qobj.site(action) + + def _set_data(self, action): + """ + capture Wikidata API response data + """ + if action == 'siteinfo': + self._set_siteinfo() + elif action == 'sitematrix': + self._set_sitematrix() + elif action == 'sitevisitors': + self._set_sitevisitors() + + def _set_siteinfo(self): + """ + capture API sitematrix data in data attribute + """ + data = self._load_response('siteinfo').get('query') + + mostviewed = data.get('mostviewed') + self.data['mostviewed'] = [] + for item in mostviewed[1:]: + if item['ns'] == 0: + self.data['mostviewed'].append(item) + + general = data.get('general') + + self.params.update({'title': general.get('sitename')}) + self.params.update({'lang': general.get('lang')}) + self.data['site'] = general.get('wikiid') + + info = {} + for item in general: + ginfo = general.get(item) + if ginfo: + info[item] = ginfo + self.data['info'] = info + + siteviews = data.get('siteviews') + if siteviews: + values = [x for x in siteviews.values() if x] + if values: + self.data['siteviews'] = int(sum(values) / len(values)) + else: + self.data['siteviews'] = 0 + + stats = data.get('statistics') + for item in stats: + self.data[item] = stats[item] + + def _set_sitematrix(self): + """ + capture API sitematrix data in data attribute + """ + data = self._load_response('sitematrix') + + self.params.update({'title': self.COMMONS}) + + matrix = data.get('sitematrix') + if matrix: + self.data['sites'] = self._sitelist(matrix) + self.data['random'] = random.choice(self.data['sites']) + + def _set_sitevisitors(self): + """ + capture API pageview/visitor data in data attribute + """ + data = self._load_response('sitevisitors').get('query') + + siteviews = data.get('siteviews') + if siteviews: + values = [x for x in siteviews.values() if x] + if values: + self.data['visitors'] = int(sum(values) / len(values)) + else: + self.data['visitors'] = 0 + + def _sitelist(self, matrix): + """ + Returns a list of sites from a SiteMatrix, optionally filtered + by 'domain' param + """ + _list = [] + for item in matrix: + sites = [] + + if isinstance(matrix[item], list): + sites = matrix[item] + elif isinstance(matrix[item], dict): + sites = matrix[item]['site'] + + for site in sites: + if len(site.keys()) > 4: # closed, fishbowl, private + continue + domain = self.params.get('domain') + if domain: + if domain in site['url']: + _list.append(site['url']) + else: + _list.append(site['url']) + + return _list + + def get_info(self, wiki=None, show=True, proxy=None, timeout=0): + """ + GET site info (general, statistics, siteviews, mostviewed) via + https://www.mediawiki.org/wiki/API:Siteinfo, and + https://www.mediawiki.org/wiki/Extension:PageViewInfo + + Optional arguments: + - [wiki]: alternate wiki site (default=en.wikipedia.org) + - [show]: echo page data if true + - [proxy]: use this HTTP proxy + - [timeout]: timeout in seconds (0=wait forever) + + Data captured: + - info: API:Siteinfo + - mostviewed: mostviewed articles {ns=0, title, count} + - site: sitename, e.g. 'enwiki' + - siteviews: sitewide pageview totals over last WEEK + - visitors: sitewide unique visitor total over last WEEK + - various counts: activeusers, admins, articles, edits, images + jobs, pages, queued-massmessages, siteviews, users, visitors + """ + if wiki: + self.params.update({'wiki': wiki}) + + self._get('siteinfo', show=False, proxy=proxy, timeout=timeout) + self._get('sitevisitors', show, proxy, timeout) + + return self + + def get_sites(self, domain=None, show=True, proxy=None, timeout=0): + """ + GET Wikimedia sites via Extension:SiteMatrix + https://www.mediawiki.org/wiki/Extension:SiteMatrix + + Optional params: + - [domain]: filter sites on this domain, e.g. 'wiktionary.org' + + Optional arguments: + - [show]: echo page data if true + - [proxy]: use this HTTP proxy + - [timeout]: timeout in seconds (0=wait forever) + + Data captured: + - random: randomly selected wiki site + - sites: of wiki sites (hundreds) from commons SiteMatrix + """ + if domain: + self.params.update({'domain': domain}) + + self.params.update({'wiki': self.COMMONS}) + + self._get('sitematrix', show, proxy, timeout) + + del self.params['wiki'] + + return self + + def top(self, wiki=None, limit=25): + """ + Print list of top viewed articles (ns=0) over the last WEEK + https://www.mediawiki.org/wiki/Extension:PageViewInfo + + Optional params: + - [wiki]: alternate wiki site (default=en.wikipedia.org) + - [limit]: show up to limit articles (max=500) + + See also: + https://en.wikipedia.org/wiki/Wikipedia_talk:Top_25_Report + """ + if wiki: + self.params.update({'wiki': wiki}) + + if 'siteinfo' not in self.cache: + self.get_info(show=False) + + print("%s mostviewed articles:" % (self.data['site'])) + + count = 0 + for item in self.data['mostviewed']: + if item['ns'] == 0: + count += 1 + print("%d. %s (%s)" % (count, item['title'], + "{:,}".format(item['count']))) + if count >= limit: + break diff --git a/nowplaying/vendor/wptools/utils.py b/nowplaying/vendor/wptools/utils.py new file mode 100644 index 00000000..e19d3ea0 --- /dev/null +++ b/nowplaying/vendor/wptools/utils.py @@ -0,0 +1,342 @@ +# -*- coding:utf-8 -*- + +""" +WPTools Utilities module. +""" + +from __future__ import print_function + +import sys + +import json + +from collections import defaultdict +from itertools import chain + +import lxml.etree +import lxml.html + +from lxml.etree import tostring + + +def get_infobox(ptree, boxterm="box"): + """ + Returns parse tree template with title containing as dict: + + = {: , ...} + + If simple transform fails, attempts more general assembly: + + = {'boxes': [{: <parts>}, ...], + 'count': <len(boxes)>} + """ + boxes = [] + for item in lxml.etree.fromstring(ptree).xpath("//template"): + + title = item.find('title').text + if title and boxterm in title: + + box = template_to_dict(item) + if box: + return box + + alt = template_to_dict_alt(item, title) + if alt: + boxes.append(alt) + + if boxes: + return {'boxes': boxes, 'count': len(boxes)} + + +def get_links(rlinks): + """ + returns list of titles/urls from query/parse links response + """ + if rlinks is None: + return + links = [] + for item in rlinks: + if 'url' in item: + links.append(item['url']) + if 'title' in item and 'ns' in item: + if item['ns'] == 0: # articles only + links.append(item['title']) + return sorted(links) if links else None + + +def is_text(obj, name=None): + """ + returns True if object is text-like + """ + try: # python2 + ans = isinstance(obj, basestring) + except NameError: # python3 + ans = isinstance(obj, str) + if name: + print("is_text: (%s) %s = %s" % (ans, name, obj.__class__), + file=sys.stderr) + return ans + + +def isfilename(name): + """ + returns True if name looks like a Mediawiki filename + """ + if name[0].isalnum() and name[-3:].isalpha(): + return True + return False + + + +def pretty(data): + """ + return pretty JSON + """ + return json.dumps(data, + indent=4, + sort_keys=True, + separators=(',', ': ')) + + +def stderr(msg, silent=False): + """ + write msg to stderr if not silent + """ + if not silent: + print(msg, file=sys.stderr) + + +def template_to_dict(tree, debug=0, find=False): + """ + returns wikitext template as dict + + debug = 1 + prints minimal debug info to stdout + debug > 1 + compares _iter() versus _find() results + find = True + sets values from _find() algorithm (default _iter()) + """ + + # you can compare (most) raw Infobox wikitext like this: + # https://en.wikipedia.org/wiki/TITLE?action=raw§ion=0 + + obj = defaultdict(str) + errors = [] + for item in tree: + try: + name = item.findtext('name').strip() + + if debug: + template_to_dict_debug(name, item, debug) + + find_val = template_to_dict_find(item, debug) # DEPRECATED + iter_val = template_to_dict_iter(item, debug) + + value = iter_val + if find: + value = find_val + + if name and value: + obj[name] = value.strip() + + except AttributeError: + + if isinstance(item, lxml.etree.ElementBase): + name = item.tag.strip() + text = item.text.strip() + if item.tag == 'title': + obj['infobox'] = text + else: + obj[name] = text + + except: + errors.append(lxml.etree.tostring(item)) + + if errors: + obj['errors'] = errors + + return dict(obj) + + +def template_to_dict_alt(tree, title): + """ + Returns parse tree template as {<title>: <parts>} + This is a more general parse tree infobox template parser. + """ + box = [] + part = [] + + for item in tree.iter(): + + if item.tag == 'part': + if part: + box.append(part) + part = [] + + if item.tag == 'name' or item.tag == 'value': + for attr in item.keys(): + part.append({attr: item.get(attr)}) + + if item.text: + part.append(item.text.strip()) + + if item.tail: + part.append(item.tail.strip()) + + if part: + box.append(part) + + return {title.strip(): box} + + +def template_to_dict_debug(name, item, debug): + """ + Print debug statements to compare algorithms + """ + if debug == 1: + print("\n%s = " % name) + elif debug > 1: + print("\n%s" % name) + print("=" * 64) + print(lxml.etree.tostring(item)) + print() + + +def template_to_dict_find(item, debug=0): + """ + DEPRECATED: Returns infobox parsetree value using etree.find() + + Older template_to_dict() algorithm, uses etree.xpath() to "lookup" + or find specific elements, but fails to include tail text in the + order it is found, and does not _exclude_ <ext> tags (references, + etc.). Compare to template_to_dict_iter(). + """ + if debug > 1: + print("template_to_dict_find:") + + tmpl = item.find('value').find('template') + + if tmpl is not None: + value = template_to_text(tmpl, debug) + else: + value = text_with_children(item.find('value'), debug) + + if debug: + print(" find: %s" % value) + + return value + + +def template_to_dict_iter(item, debug=0): + """ + Returns infobox parsetree value using etree.iter() + + Preferred template_to_dict() algorithm, uses etree.iter() to + iterate over elements, accumulating tail text in order, but not + preserving `<ext>` tags (references, etc.). The advantage is that + it picks up MORE templates and links that may be mixed in with + `<ext>` tags, and keeps the result focused on the data. Compare to + template_to_dict_find(). + """ + valarr = [] + found_template = False + + if debug > 1: + print("template_to_dict_iter:") + + for elm in item.iter(): + + if debug > 1: + template_to_dict_iter_debug(elm) + + if elm.tag == 'value' and not found_template: + valarr.append(elm.text.strip()) + + if elm.tag == 'template': + found_template = True + valarr.append(template_to_text(elm, debug).strip()) + + if elm.tail: + valarr.append(elm.tail.strip()) + + value = " ".join([x for x in valarr if x]) + + if debug: + print(" iter: %s" % value) + + return value + + +def template_to_dict_iter_debug(elm): + """ + Print expanded element on stdout for debugging + """ + if elm.text is not None: + print(" <%s>%s</%s>" % (elm.tag, elm.text, elm.tag), end='') + if elm.tail is not None: + print(elm.tail) + else: + print() + else: + if elm.tail is not None: + print(" <%s>%s" % (elm.tag, elm.tail)) + else: + print(" <%s>" % elm.tag) + + +def template_to_text(tmpl, debug=0): + """ + convert parse tree template to text + """ + tarr = [] + for item in tmpl.itertext(): + tarr.append(item) + + text = "{{%s}}" % "|".join(tarr).strip() + + if debug > 1: + print("+ template_to_text:") + print(" %s" % text) + + return text + + +def text_with_children(node, debug=0): + """ + DEPRECATED: return text content with children (#62), sub-elements (#66) + + Only used by deprecated template_to_dict_find(), and suffers from + copypasta code smell. + """ + + # https://stackoverflow.com/questions/4624062/get-all-text-inside-a-tag-in-lxml + + if sys.version.startswith('3'): # py3 needs encoding=str + parts = ([node.text] + + list(chain( + *([tostring(c, with_tail=False, encoding=str), + c.tail] for c in node.getchildren()))) + + [node.tail]) + else: + parts = ([node.text] + + list(chain( + *([tostring(c, with_tail=False), + c.tail] for c in node.getchildren()))) + + [node.tail]) + + value = ''.join(filter(lambda x: x or isinstance(x, str), parts)).strip() + + if debug > 1: + print("+ text_with_children:") + print(" %s" % value) + + return value + + +def wikidata_url(wikibase): + """ + returns Wikidata URL from wikibase + """ + if wikibase: + return 'https://www.wikidata.org/wiki/' + wikibase diff --git a/nowplaying/vendor/wptools/wikidata.py b/nowplaying/vendor/wptools/wikidata.py new file mode 100644 index 00000000..25eeca7c --- /dev/null +++ b/nowplaying/vendor/wptools/wikidata.py @@ -0,0 +1,388 @@ +# -*- coding:utf-8 -*- + +""" +WPTools Wikidata module +~~~~~~~~~~~~~~~~~~~~~~~ + +Support for getting Wikidata. + +https://www.wikidata.org/wiki/Wikidata:Data_access +""" + +import collections +import re + +from . import core +from . import utils + + +class WPToolsWikidata(core.WPTools): + """ + WPToolsWikidata class + """ + + def __init__(self, *args, **kwargs): + """ + Returns a WPToolsWikidata object + + Optional positional {params}: + - [title]: <str> Mediawiki page title, file, category, etc. + + Optional keyword {params}: + - [lang]: <str> Mediawiki language code (default='en') + - [variant]: <str> Mediawiki language variant + - [wikibase]: <str> Wikidata database ID (e.g. 'Q1') + + Optional keyword {flags}: + - [silent]: <bool> do not echo page data if True + - [skip]: <list> skip actions in this list + - [verbose]: <bool> verbose output to stderr if True + """ + super(WPToolsWikidata, self).__init__(*args, **kwargs) + + wikibase = kwargs.get('wikibase') + if wikibase: + self.params.update({'wikibase': wikibase}) + + self.user_labels = None + + def _get_entity_prop(self, entity, prop): + """ + returns Wikidata entity property value + """ + variant = self.params.get('variant') + lang = self.params.get('lang') + + if entity.get(prop): + ent = entity[prop] + try: + return ent[variant or lang].get('value') + except AttributeError: + return ent.get('value') + + def _marshal_claims(self, query_claims): + """ + set Wikidata entities from query claims + """ + claims = reduce_claims(query_claims) + # self.data['claimq'] = query_claims + self.data['claims'] = claims + + entities = set() + for eid in claims: + if self.user_labels: + if eid in self.user_labels or eid == 'P31': + entities.add(eid) # P (property) + else: + continue # get only wanted entities + else: + entities.add(eid) # P (property) + + for val in claims[eid]: + if utils.is_text(val) and re.match(r'^Q\d+$', val): + entities.add(val) # Q (item) + + self.data['entities'] = list(entities) + + def _pop_entities(self, limit=50): + """ + returns up to limit entities and pops them off the list + """ + pop = self.data['entities'][:limit] + del self.data['entities'][:limit] + return pop + + def _post_labels_updates(self): + """ + updates possible after getting labels + """ + self._update_wikidata() + self._update_images() + self._update_what() + + def _query(self, action, qobj): + """ + returns wikidata query string + """ + if action == 'labels': + return qobj.labels(self._pop_entities()) + elif action == 'wikidata': + return qobj.wikidata(self.params.get('title'), + self.params.get('wikibase')) + + def _set_data(self, action): + """ + capture Wikidata API response data + """ + if action == 'labels': + self._set_labels() + + if action == 'wikidata': + self._set_wikidata() + self.get_labels(show=False) + + def _set_labels(self): + """ + set entity labels from get_labels() + """ + data = self._load_response('labels') + entities = data.get('entities') or [] + + for ent in entities: + label = self._get_entity_prop(entities[ent], 'labels') + self.data['labels'][ent] = label + + def _set_title(self, item): + """ + attempt to set title from wikidata + """ + title = None + lang = self.params['lang'] + label = self.data['label'] + + if item.get('sitelinks'): + for link in item['sitelinks']: + if link == "%swiki" % lang: + title = item['sitelinks'][link]['title'] + self.data['title'] = title.replace(' ', '_') + + if not self.data.get('title') and label: + self.data['title'] = label.replace(' ', '_') + + if self.data.get('title') and not self.params.get('title'): + self.params['title'] = self.data['title'] + + def _set_wikidata(self): + """ + set attributes derived from Wikidata (action=wbentities) + """ + self.data['labels'] = {} + self.data['wikidata'] = {} + + data = self._load_response('wikidata') + entities = data.get('entities') + item = entities.get(next(iter(entities))) + + self.data['wikidata_pageid'] = item.get('pageid') + + aliases = item.get('aliases') + if aliases: + aliases = [x['value'] for x in aliases[self.params['lang']]] + self.data['aliases'] = aliases + + modified = item.get('modified') + try: + self.data['modified'].update({'wikidata': modified}) + except KeyError: + self.data['modified'] = {'wikidata': modified} + + wikibase = item.get('id') + if wikibase: + self.data['wikibase'] = wikibase + self.data['wikidata_url'] = utils.wikidata_url(wikibase) + + self.data['description'] = self._get_entity_prop(item, 'descriptions') + self.data['label'] = self._get_entity_prop(item, 'labels') + + self._marshal_claims(item.get('claims')) + self._set_title(item) + + def _update_images(self): + """ + add images from Wikidata + """ + wd_images = self.data['claims'].get('P18') # image + + if wd_images: + if not isinstance(wd_images, list): + wd_images = [wd_images] + + if 'image' not in self.data: + self.data['image'] = [] + + for img_file in wd_images: + self.data['image'].append({'file': img_file, + 'kind': 'wikidata-image'}) + + def _update_what(self): + """ + set what this thing is! "instance of (P31)" + """ + if 'P31' not in self.data['claims']: # missing Wikidata + msg = ("Note: Wikidata item %s" % self.data['wikibase'], + "missing 'instance of' (P31)") + utils.stderr(" ".join(msg)) + return + + instance_of = self.data['claims']['P31'][0] + labels = self.data['labels'] + + if instance_of in labels: + self.data['what'] = labels[instance_of] + + def _update_wikidata(self): + """ + set wikidata from claims and labels + """ + claims = self.data['claims'] + + for ent in claims: + + plabel = self.data['labels'].get(ent) # P (property) label + if plabel: + plabel = "%s (%s)" % (plabel, ent) + + claim = [] + for item in claims[ent]: + ilabel = item + if utils.is_text(item) and re.match(r'^Q\d+$', item): + ilabel = self.data['labels'].get(item) # Q (item) label + if ilabel: + ilabel = "%s (%s)" % (ilabel, item) + + if len(claims[ent]) == 1: + claim = ilabel + else: + claim.append(ilabel) + + if plabel and ilabel: + self.data['wikidata'][plabel] = claim + + def get_labels(self, show=False, proxy=None, timeout=0): + """ + GET Wikidata:API (action=wbgetentities) for claims labels + https://www.wikidata.org/w/api.php?action=help&modules=wbgetentities + + Required {data}: entities, claims + data['claims']: {'P17': ['Q17'], 'P910': ['Q8854938'], ...} + data['entities']: ['P17', 'Q17', 'P910', 'Q8854938', ...] + + Data captured: + labels: {'P17': 'country', 'Q17': 'Japan', ...} + wikidata: {'country (P17)': 'Japan (Q17)', ...} + + Use get_wikidata() to populate data['entities'] + """ + if 'entities' not in self.data: + utils.stderr("No entities found.") + return + + skip_flag = False + if 'skip' in self.flags and 'labels' in self.flags['skip']: + skip_flag = True + + while 'entities' in self.data and self.data['entities']: + if skip_flag: + break + self._get('labels', show, proxy, timeout) + + if 'entities' in self.data: + del self.data['entities'] + + self._post_labels_updates() + + return self + + def get_wikidata(self, show=True, proxy=None, timeout=0): + """ + GET Wikidata:API (action=wbgetentities) wikidata + https://www.wikidata.org/w/api.php?action=help&modules=wbgetentities + + Required {params}: title OR wikibase + - title: <str> Mediawiki page title, file, category, etc. + - wikibase: <str> Wikidata item ID + + Optional {params}: + - [lang]: <str> Mediawiki language code (default='en') + - [variant]: <str> Mediawiki language variant + + Optional arguments: + - [show]: <bool> echo page data if true + - [proxy]: <str> use this HTTP proxy + - [timeout]: <int> timeout in seconds (0=wait forever) + + Data captured: + - aliases: <list> list of "also known as" + - claims: <dict> intermediate Wikidata claims (compare to cache) + - description: <str> Wikidata description + - image: <dict> {wikidata-image} Wikidata Property:P18 + - label: <str> Wikidata label + - labels: <str> list of resolved labels + - modified (wikidata): <str> ISO8601 date and time + - pageid: <int> Wikipedia database ID + - requests: list of request actions made + - title: <str> article title + - what: <str> Wikidata Property:P31 "instance of" + - wikibase: <str> Wikidata item ID + - wikidata: <dict> resolved Wikidata claims + - wikidata_url: <str> Wikidata URL + """ + title = self.params.get('title') + wikibase = self.params.get('wikibase') + + if not wikibase and not title: + err = "get_wikidata needs wikibase or title" + raise LookupError(err) + + self._get('wikidata', show, proxy, timeout) + + return self + + def wanted_labels(self, labels): + """ + Specify only WANTED labels to minimize get_labels() requests + + Args: + - labels: <list> of wanted labels. + + Example: + page.wanted_labels(['P18', 'P31']) + """ + if not isinstance(labels, list): + raise ValueError("Input labels must be a list.") + + self.user_labels = labels + + +################################################################ + + +def reduce_claims(query_claims): + """ + returns claims as reduced dict {P: [Q's or values]} + P = property + Q = item + """ + claims = collections.defaultdict(list) + + for claim, entities in query_claims.items(): + + for ent in entities: + + try: + snak = ent.get('mainsnak') + snaktype = snak.get('snaktype') + value = snak.get('datavalue').get('value') + except AttributeError: + claims[claim] = [] + + try: + if snaktype != 'value': + val = snaktype + elif value.get('id'): + val = value.get('id') + elif value.get('text'): + val = value.get('text') + elif value.get('time'): + val = value.get('time') + else: + val = value + except AttributeError: + val = value + + if not val or not [x for x in val if x]: + raise ValueError("%s %s" % (claim, ent)) + + claims[claim].append(val) + + return dict(claims) diff --git a/nowplaying/vendor/wptools/wptool.py b/nowplaying/vendor/wptools/wptool.py new file mode 100644 index 00000000..707acacc --- /dev/null +++ b/nowplaying/vendor/wptools/wptool.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python -u +# -*- coding:utf-8 -*- +""" +Command line interface to wptools. +""" + +from __future__ import print_function + +import argparse +import re +import sys +import time +import textwrap +from nowplaying.vendor import wptools + +from nowplaying.vendor.wptools.query import WPToolsQuery + + +def _html_image(page): + """ + returns HTML img tag + """ + source = _image(page) + if not source: + return + alt = page.data.get('label') or page.data.get('title') + img = "<img src=\"%s\"" % source + img += " alt=\"%s\" title=\"%s\" " % (alt, alt) + img += "align=\"right\" width=\"240\">" + return img + + +def _html_title(page): + """ + returns Wiki-linked HTML title + """ + link = "<a href=\"%s\">%s</a>" % (page.data.get('url'), + page.data.get('title')) + desc = page.data.get('description') + if desc: + link += "—<i>%s</i>" % desc + else: + link += "—<i>description</i>" + if link: + return "<p>%s</p>" % link + + +def _image(page): + """ + returns (preferred) image from wptools object + """ + pageimage = page.images(token='pageimage') + if pageimage: + return pageimage[0]['url'] + + +def _page_html(page): + """ + returns assembled HTML output + """ + out = [] + out.append(_html_title(page)) + out.append(_html_image(page)) + out.append(page.data.get('extract')) + return "\n".join([x for x in out if x]) + + +def _page_text(page, nowrap=False): + """ + returns assembled text output + """ + title = page.data['title'] + title = "%s\n%s" % (title, "=" * len(title)) + + desc = page.data.get('description') + if desc: + desc = "_%s_" % desc + + img = _text_image(page) + + pars = page.data.get('extext') + + if pars: + # pars = pars.replace(' * ', '\n * ') + pars = re.sub(r'[ ]+\*[ ]+', '* ', pars) + + if pars and not nowrap: + parlist = [] + for par in pars.split("\n\n"): + parlist.append("\n".join(textwrap.wrap(par))) + + disambiguation = page.data.get('disambiguation') + if disambiguation: + parlist.append(' * ' + "\n * ".join(page.data.get('links'))) + + pars = "\n\n".join(parlist) + + url = '<%s>' % page.data['url'] + + txt = [] + txt.append(title) + txt.append(desc) + txt.append(url) + txt.append(pars) + txt.append(img) + + return "\n\n".join([x for x in txt if x]) + + +def _safe_exit(start, output): + """ + exit without breaking pipes + """ + try: + sys.stdout.write(output) + sys.stdout.flush() + except TypeError: # python3 + sys.stdout.write(str(output, 'utf-8')) + sys.stdout.flush() + except IOError: + pass + + seconds = time.time() - start + print("\n\n%5.3f seconds" % (seconds), file=sys.stderr) + + +def _text_image(page): + """ + returns text image URL + """ + img = None + alt = page.data.get('label') or page.data.get('title') + source = _image(page) + if source: + img = "![%s](%s)" % (alt, source) + return img + + +def get(args): + """ + invoke wptools and assemble selected output + """ + + html = args.H + lang = args.l + nowrap = args.n + query = args.q + silent = args.s + title = args.t + verbose = args.v + wiki = args.w + + if query: + qobj = WPToolsQuery(lang=lang, wiki=wiki) + if title: + return qobj.query(title) + return qobj.random() + + page = wptools.page(title, lang=lang, silent=silent, + verbose=verbose, wiki=wiki) + + try: + page.get_query() + except (StandardError, ValueError, LookupError): + return "NOT_FOUND" + + if not page.data.get('extext'): + out = page.cache['query']['query'] + + out = _page_text(page, nowrap) + if html: + out = _page_html(page) + + try: + return out.encode('utf-8') + except KeyError: + return out + + +def parse_args(): + """ + parse main() args + """ + description = ( + "Get Wikipedia article info and Wikidata via MediaWiki APIs.\n\n" + "Gets a random English Wikipedia article by default, or in the\n" + "language -lang, or from the wikisite -wiki, or by specific\n" + "title -title. The output is a plain text extract unless -HTML.") + epilog = ("Powered by https://github.com/siznax/wptools/ %s" + % wptools.__version__) + argp = argparse.ArgumentParser( + description=description, + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=epilog) + argp.add_argument("-H", "-HTML", action='store_true', + help="output HTML extract") + argp.add_argument("-l", "-lang", default='en', + help="language code") + argp.add_argument("-n", "-nowrap", action='store_true', + help="do not wrap text") + argp.add_argument("-q", "-query", action='store_true', + help="show query and exit") + argp.add_argument("-s", "-silent", action='store_true', + help="quiet output to stderr") + argp.add_argument("-t", "-title", help="get a specific title") + argp.add_argument("-v", "-verbose", action='store_true', + help="HTTP status to stderr") + argp.add_argument("-w", "-wiki", + help="use alternative wikisite") + return argp.parse_args() + + +def main(args=None): + """ + invoke wptools and exit safely + """ + if not args: + args = parse_args() + start = time.time() + output = get(args) + _safe_exit(start, output) + + +if __name__ == "__main__": + main(parse_args()) diff --git a/pyproject.toml b/pyproject.toml index fb5a10e2..2075980b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ packages = [ "nowplaying.vendor.audio_metadata.tbm_utils", "nowplaying.vendor.discogs_client", "nowplaying.vendor.pid", + "nowplaying.vendor.wptools" ] license-files = ["LICENSE.txt"] diff --git a/requirements-run.txt b/requirements-run.txt index 79e7391a..41ec36f2 100644 --- a/requirements-run.txt +++ b/requirements-run.txt @@ -45,3 +45,9 @@ python-dateutil # required to install # versioningit==2.2.0 + +# +# wptools +# +BeautifulSoup4 +lxml diff --git a/vendor.txt b/vendor.txt index 203ba973..abdaf80f 100644 --- a/vendor.txt +++ b/vendor.txt @@ -1,3 +1,4 @@ git+https://github.com/whatsnowplaying/audio-metadata.git@nowplaying git+https://github.com/whatsnowplaying/discogs_client@nowplaying git+https://github.com/whatsnowplaying/pid@main +git+https://github.com/whatsnowplaying/wptools@nowplaying