From 41f548af2ec898886668b9831d386dbd6a6c91d1 Mon Sep 17 00:00:00 2001 From: jmcb Date: Tue, 30 May 2023 16:05:36 +0100 Subject: [PATCH 1/7] Add gitignore --- .gitignore | 160 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6769e21 --- /dev/null +++ b/.gitignore @@ -0,0 +1,160 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ \ No newline at end of file From 07a876c0c75275c0841b2f3860d93d98211efbcc Mon Sep 17 00:00:00 2001 From: jmcb Date: Tue, 30 May 2023 16:17:25 +0100 Subject: [PATCH 2/7] Switch to pyproject.toml --- pyproject.toml | 22 ++++++++++++++++++++++ setup.py | 15 --------------- 2 files changed, 22 insertions(+), 15 deletions(-) create mode 100644 pyproject.toml delete mode 100644 setup.py diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..afac9a4 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,22 @@ +[build-system] +requires = ["setuptools", "setuptools-scm"] +build-backend = "setuptools.build_meta" + +[project] +name = "AUR" +version = "2021.11.20" +description = "AUR-related modules and helper utilities (aurploader, aurquery, aurtomatic)." +license = {file = "COPYING"} +authors = [ + {name = "Xyne", email = "xyne@archlinux.org"}, +] + +[project.urls] +Home = "http://xyne.dev/projects/python3-aur" + +[project.scripts] +aurpkglist = "aurpkglist" +aurploder = "aurploder" +aurquery = "aurquery" +aurtomatic = "aurtomatic" +aurtus = "aurtus" diff --git a/setup.py b/setup.py deleted file mode 100644 index 63da98c..0000000 --- a/setup.py +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/env python3 - -from distutils.core import setup -import time - -setup( - name='AUR', - version=time.strftime('%Y.%m.%d.%H.%M.%S', time.gmtime(1637376062)), - description='AUR-related modules and helper utilities (aurploader, aurquery, aurtomatic).', - author='Xyne', - author_email='gro xunilhcra enyx, backwards', - url='http://xyne.dev/projects/python3-aur', - packages=['AUR'], - scripts=['aurpkglist', 'aurploader', 'aurquery', 'aurtomatic', 'aurtus'] -) From 0d22b5e3b43cdd42ba6a78eb10967e03e69a211c Mon Sep 17 00:00:00 2001 From: jmcb Date: Tue, 30 May 2023 16:17:50 +0100 Subject: [PATCH 3/7] Apply the Black formatter --- AUR/AurPkg.py | 41 +- AUR/Aurtomatic.py | 1317 +++++++++++++++++++++---------------------- AUR/PkgList.py | 129 +++-- AUR/RPC.py | 493 ++++++++-------- AUR/SRCINFO.py | 314 +++++------ AUR/common.py | 10 +- examples/comment.py | 24 +- examples/msearch.py | 72 +-- examples/vote.py | 12 +- 9 files changed, 1205 insertions(+), 1207 deletions(-) diff --git a/AUR/AurPkg.py b/AUR/AurPkg.py index dd07ab2..cfd2a8a 100644 --- a/AUR/AurPkg.py +++ b/AUR/AurPkg.py @@ -25,25 +25,23 @@ # ---------------------------------- PkgSet ---------------------------------- # -class AurPkgSet(XCPF.ArchPkg.PkgSet): +class AurPkgSet(XCPF.ArchPkg.PkgSet): def __init__(self, pkgs=None): - accessors = { - 'name': lambda x: x['Name'], - 'version': lambda x: x['Version'] - } + accessors = {"name": lambda x: x["Name"], "version": lambda x: x["Version"]} super(self.__class__, self).__init__(accessors, pkgs=pkgs) # ---------------------------- Buildable Packages ---------------------------- # + class AurBuildablePkgFactory(XCPF.ArchPkg.BuildablePkgFactory): - ''' + """ Wrapper class to convert AUR packages to AurBuildablePkgs. arch: Target architecture. asi: AUR.SRCINFO.AurSrcinfo instance. - ''' + """ def __init__(self, arch, *args, asi=None, **kwargs): self.arch = arch @@ -71,14 +69,13 @@ def __init__(self, arch, pkg, asi=None): def get_srcinfo(self): if self.srcinfo is None: try: - pkgbases_and_pkgnames = ((self.pkg['PackageBase'], self.pkg['Name']),) + pkgbases_and_pkgnames = ((self.pkg["PackageBase"], self.pkg["Name"]),) for si in self.asi.get_pkginfo(pkgbases_and_pkgnames): self.srcinfo = si break except urllib.error.HTTPError as e: raise XCPF.ArchPkg.BuildablePkgError( - 'failed to retrieve .SRCINFO for {}'.format(self.pkg.name), - error=e + "failed to retrieve .SRCINFO for {}".format(self.pkg.name), error=e ) return self.srcinfo @@ -86,36 +83,36 @@ def buildable(self): return True def maintainers(self): - m = self.pkg['Maintainer'] + m = self.pkg["Maintainer"] if m: yield m def pkgname(self): - return self.pkg.get('Name') + return self.pkg.get("Name") def version(self): - return self.pkg.get('Version') + return self.pkg.get("Version") def pkgbase(self): - return self.pkg.get('PackageBase') + return self.pkg.get("PackageBase") def repo(self): - return 'AUR' + return "AUR" def last_modified(self): - return self.pkg.get('LastModified') + return self.pkg.get("LastModified") def last_packager(self): - return self.pkg.get('LastPackager') + return self.pkg.get("LastPackager") def with_arch_deps(self, field): # for SRCINFO field = field.lower() srcinfo = self.get_srcinfo() - if self.arch == 'any': + if self.arch == "any": fields = [field] else: - fields = [field, '{}_{}'.format(field, self.arch)] + fields = [field, "{}_{}".format(field, self.arch)] for f in fields: try: @@ -126,10 +123,10 @@ def with_arch_deps(self, field): continue def deps(self): - return self.with_arch_deps('Depends') + return self.with_arch_deps("Depends") def makedeps(self): - return self.with_arch_deps('MakeDepends') + return self.with_arch_deps("MakeDepends") def checkdeps(self): - return self.with_arch_deps('CheckDepends') + return self.with_arch_deps("CheckDepends") diff --git a/AUR/Aurtomatic.py b/AUR/Aurtomatic.py index faf86a4..bcf410e 100644 --- a/AUR/Aurtomatic.py +++ b/AUR/Aurtomatic.py @@ -41,738 +41,731 @@ ################################### Globals #################################### -INDEX_URL = AUR.common.AUR_URL + '/index.php' -LOGIN_URL = AUR.common.AUR_URL + '/login/' +INDEX_URL = AUR.common.AUR_URL + "/index.php" +LOGIN_URL = AUR.common.AUR_URL + "/login/" # PKGSUBMIT_URL = AUR.common.AUR_URL + '/pkgsubmit.php' -PKGSUBMIT_URL = AUR.common.AUR_URL + '/submit/' -RPC_URL = AUR.common.AUR_URL + '/rpc.php' -ACTION_URL = AUR.common.AUR_URL + '/pkgbase' +PKGSUBMIT_URL = AUR.common.AUR_URL + "/submit/" +RPC_URL = AUR.common.AUR_URL + "/rpc.php" +ACTION_URL = AUR.common.AUR_URL + "/pkgbase" -TOKENSCRAPER_URL = AUR.common.AUR_URL + '/packages/python3-aur/' +TOKENSCRAPER_URL = AUR.common.AUR_URL + "/packages/python3-aur/" PACKAGE_ACTIONS = ( - 'unflag', - 'vote', - 'unvote', - 'notify', - 'unnotify', + "unflag", + "vote", + "unvote", + "notify", + "unnotify", ) FORM_ACTIONS = { - 'vote' : 'Vote', - 'unvote' : 'UnVote', - 'notify' : 'Notify', - 'unnotify' : 'UnNotify', - 'flag' : 'Flag', - 'unflag' : 'UnFlag', - 'disown' : 'Disown', - 'delete' : 'Delete', - 'adopt' : 'Adopt' + "vote": "Vote", + "unvote": "UnVote", + "notify": "Notify", + "unnotify": "UnNotify", + "flag": "Flag", + "unflag": "UnFlag", + "disown": "Disown", + "delete": "Delete", + "adopt": "Adopt", } -VALUE_ACTIONS = ('flag', 'comment', 'setkeywords') +VALUE_ACTIONS = ("flag", "comment", "setkeywords") -DO_ACTIONS = ('adopt', 'disown', 'delete') +DO_ACTIONS = ("adopt", "disown", "delete") ACCOUNT_RESULTS_PER_PAGE = 50 - ################################## Functions ################################### -def get_default_cookiejar_path(): - ''' - Get the default path to the cookie jar. - ''' - cache_dir = xdg.BaseDirectory.save_cache_path(AUR.common.XDG_NAME) - return os.path.join(cache_dir, 'cookiejar.txt') +def get_default_cookiejar_path(): + """ + Get the default path to the cookie jar. + """ + cache_dir = xdg.BaseDirectory.save_cache_path(AUR.common.XDG_NAME) + return os.path.join(cache_dir, "cookiejar.txt") def load_login_file(fpath): - ''' - Load login name and password from file. - ''' - with open(fpath) as f: - name = f.readline().rstrip('\n') - passwd = f.readline().rstrip('\n') - return name, passwd + """ + Load login name and password from file. + """ + with open(fpath) as f: + name = f.readline().rstrip("\n") + passwd = f.readline().rstrip("\n") + return name, passwd @XCGF.deprecated def prompt_comment(pkginfo): - ''' - Deprecated comment prompt function. - ''' - return prompt_input(pkginfo, 'Enter a comment.') - + """ + Deprecated comment prompt function. + """ + return prompt_input(pkginfo, "Enter a comment.") def prompt_input(pkginfo, prompt): - ''' - Prompt the user for input. - - The EDITOR environment variable must be set. - ''' - editor = os.getenv('EDITOR') - if not editor: - raise AurtomaticError('environment variable "EDITOR" is not set') - if os.path.isdir('/dev/shm'): - dpath = '/dev/shm' - else: - dpath = None - with tempfile.TemporaryDirectory(dir=dpath) as d: - fpath = os.path.join(d, pkginfo['Name']) - marker = '###>' - header = ( - 'Package: {}'.format(pkginfo['Name']), - 'Webpage: {}/packages.php?ID={!s}'.format(AUR.common.AUR_URL, pkginfo['ID']), - 'Lines beginning with "{}" are ignored.'.format(marker), - 'If the rest of the file is empty, no comment will be submitted.', - prompt - ) - with open(fpath, 'w') as f: - for line in header: - f.write('{} {}\n'.format(marker, line)) - p = subprocess.Popen([editor, fpath]) - p.wait() - comment = '' - with open(fpath) as f: - for line in f: - if line.startswith(marker): - continue - comment += line - return comment.strip() - - + """ + Prompt the user for input. + + The EDITOR environment variable must be set. + """ + editor = os.getenv("EDITOR") + if not editor: + raise AurtomaticError('environment variable "EDITOR" is not set') + if os.path.isdir("/dev/shm"): + dpath = "/dev/shm" + else: + dpath = None + with tempfile.TemporaryDirectory(dir=dpath) as d: + fpath = os.path.join(d, pkginfo["Name"]) + marker = "###>" + header = ( + "Package: {}".format(pkginfo["Name"]), + "Webpage: {}/packages.php?ID={!s}".format( + AUR.common.AUR_URL, pkginfo["ID"] + ), + 'Lines beginning with "{}" are ignored.'.format(marker), + "If the rest of the file is empty, no comment will be submitted.", + prompt, + ) + with open(fpath, "w") as f: + for line in header: + f.write("{} {}\n".format(marker, line)) + p = subprocess.Popen([editor, fpath]) + p.wait() + comment = "" + with open(fpath) as f: + for line in f: + if line.startswith(marker): + continue + comment += line + return comment.strip() ################################### Classes #################################### + class AurtomaticError(Exception): - ''' - Exceptions raised by AUR interactions and related functions. - ''' - def __init__(self, msg): - self.msg = msg + """ + Exceptions raised by AUR interactions and related functions. + """ - def __str__(self): - return self.msg + def __init__(self, msg): + self.msg = msg + def __str__(self): + return self.msg class TokenScraper(html.parser.HTMLParser): - ''' - Scrape the hidden token field required for submitting forms. - ''' - def __init__(self): - super(self.__class__, self).__init__() - self.parse_options = False - self.token = None - self.errors = list() - self.parse_errorlist = False - - def handle_starttag(self, tag, attrs): - # Get the hidden token value. - if tag == 'input': - a = dict(attrs) - if a['type'] == 'hidden' and a['name'] == 'token' and a['value']: - self.token = a['value'] - - elif tag == 'ul': - a = dict(attrs) - try: - if a['class'] == 'errorlist': - self.parse_errorlist = True - except KeyError: - pass - - def handle_endtag(self, tag): - if tag == 'ul' and self.parse_errorlist: - self.parse_errorlist = False - - def handle_data(self, data): - if self.parse_errorlist: - self.errors.append(data) - + """ + Scrape the hidden token field required for submitting forms. + """ + + def __init__(self): + super(self.__class__, self).__init__() + self.parse_options = False + self.token = None + self.errors = list() + self.parse_errorlist = False + + def handle_starttag(self, tag, attrs): + # Get the hidden token value. + if tag == "input": + a = dict(attrs) + if a["type"] == "hidden" and a["name"] == "token" and a["value"]: + self.token = a["value"] + + elif tag == "ul": + a = dict(attrs) + try: + if a["class"] == "errorlist": + self.parse_errorlist = True + except KeyError: + pass + + def handle_endtag(self, tag): + if tag == "ul" and self.parse_errorlist: + self.parse_errorlist = False + + def handle_data(self, data): + if self.parse_errorlist: + self.errors.append(data) class AccountScraper(html.parser.HTMLParser): - ''' - Scrape account data from the account search results page. - ''' - def __init__(self): - super(self.__class__, self).__init__() - self.headers = list() - self.accounts = list() - self.account = None - self.in_h2 = False - self.in_results = False - self.header = False - self.account_info = False - - def handle_starttag(self, tag, attrs): - if tag == 'h2': - self.in_h2 = True - - elif not self.in_results: - return - - elif tag == 'th': - self.header = True - - elif tag == 'td': - self.account_info = True - self.field_data = '' - - elif tag == 'tr': - self.account = list() - - def handle_endtag(self, tag): - if tag == 'h2': - self.in_h2 = False - - elif not self.in_results: - return - - elif tag == 'th': - self.header = False - - elif tag == 'td': - self.account_info = False - data = self.field_data.strip() - if not data: - data = None - self.account.append(data) - - elif tag == 'tr': - if self.account: - self.accounts.append(self.account) - self.account = list() - - elif tag == 'table': - self.in_results = False - - def handle_data(self, data): - if self.in_h2 and data.strip() == 'Accounts': - self.in_results = True - elif not self.in_results: - return - elif self.header: - data = data.strip() - self.headers.append(data.strip()) - elif self.account_info: - data = data.strip() - if data == ' ': - data = '' - self.field_data += data - + """ + Scrape account data from the account search results page. + """ + + def __init__(self): + super(self.__class__, self).__init__() + self.headers = list() + self.accounts = list() + self.account = None + self.in_h2 = False + self.in_results = False + self.header = False + self.account_info = False + + def handle_starttag(self, tag, attrs): + if tag == "h2": + self.in_h2 = True + + elif not self.in_results: + return + + elif tag == "th": + self.header = True + + elif tag == "td": + self.account_info = True + self.field_data = "" + + elif tag == "tr": + self.account = list() + + def handle_endtag(self, tag): + if tag == "h2": + self.in_h2 = False + + elif not self.in_results: + return + + elif tag == "th": + self.header = False + + elif tag == "td": + self.account_info = False + data = self.field_data.strip() + if not data: + data = None + self.account.append(data) + + elif tag == "tr": + if self.account: + self.accounts.append(self.account) + self.account = list() + + elif tag == "table": + self.in_results = False + + def handle_data(self, data): + if self.in_h2 and data.strip() == "Accounts": + self.in_results = True + elif not self.in_results: + return + elif self.header: + data = data.strip() + self.headers.append(data.strip()) + elif self.account_info: + data = data.strip() + if data == " ": + data = "" + self.field_data += data class Aurtomatic(object): - ''' - A user object for interactive actions. - ''' - - def __init__( - self, - cookiejar_path=None, - cookiejar=None, - token=None - ): - ''' - cookiejar: a MozillaCookieJar object - - token: a user token for submitting form data - ''' - - if cookiejar_path is None: - cookiejar_path = get_default_cookiejar_path() - self.cookiejar_path = cookiejar_path - - if cookiejar is None: - self.cookiejar = http.cookiejar.MozillaCookieJar() - self.load_cookies() - else: - self.cookiejar = cookiejar - - # TODO - # Find way to use this with URL opener. (urlopen accepts a capath arg) - # CA_PATH = '/etc/ssl/certs' - self.opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(self.cookiejar)) - self.token = token + """ + A user object for interactive actions. + """ -# self.rpc = AUR.RPC.AUR(ttl=0, clean=False) - self.rpc = AUR.RPC.AurRpc() + def __init__(self, cookiejar_path=None, cookiejar=None, token=None): + """ + cookiejar: a MozillaCookieJar object + token: a user token for submitting form data + """ + if cookiejar_path is None: + cookiejar_path = get_default_cookiejar_path() + self.cookiejar_path = cookiejar_path - def get_info(self, pkgname): - ''' - Get package information from the RPC interface. - ''' - for pkg in self.rpc.info(pkgname): - return pkg - - + if cookiejar is None: + self.cookiejar = http.cookiejar.MozillaCookieJar() + self.load_cookies() + else: + self.cookiejar = cookiejar - def load_token(self): - ''' - Attempt to load the hidden token. If the token is empty after this operation - then the user is not currently logged in, so it doubles as a login check. - ''' - parser = TokenScraper() - with self.opener.open(TOKENSCRAPER_URL) as f: - parser.feed(f.read().decode()) - if parser.token: - self.token = parser.token - return True - else: - return False - - - - def login(self, user=None, passwd=None, login_file=None, remember_me=True): - ''' - Log in to the AUR. - ''' - if login_file is not None: - user, passwd = load_login_file(login_file) - - if user is None or passwd is None: - logging.info("logging in to the AUR") - - if user is None: - user = input('Username: ') - - if passwd is None: - passwd = getpass.getpass() - - data = [ - ('user', user), - ('passwd', passwd) - ] - - if remember_me: - data.append(('remember_me', '1')) - - data = urllib.parse.urlencode(data).encode('UTF-8') - - with self.opener.open(LOGIN_URL, data) as f: - pass - - - - # python3-AUR could be used to cache the data, but sometimes the data must be - # fresh, such as when confirming the upload. - def submit_package_form( - self, pkginfo, action, - confirm=False, merge_into=None, value=None, comment=None, comment_id=None - ): - ''' - Submit a form to the AUR. - ''' - - if comment is not None: - XCGF.warn_deprecated('keyword argument "comment" in submit_package_form is deprecated in favor of "value"') - value = comment - - ID = pkginfo['ID'] - url = ACTION_URL + '/{}/'.format(pkginfo['PackageBase']) - - #Perform one of the link-based package actions. - if action in PACKAGE_ACTIONS: - url += action - data = ( - ('token', self.token), - ) - - elif action in FORM_ACTIONS: - if action in DO_ACTIONS or action == 'flag': - ID = pkginfo['PackageBaseID'] - a = FORM_ACTIONS[action] - data = [ - ('IDs[{!s}]'.format(ID), '1'), - ('ID', ID), - ('token', self.token), - ('do_{}'.format(a), a) - ] - if confirm: - data.append(('confirm', '1')) - if merge_into: - data.append(('merge_Into', merge_into)) - if action == 'flag': - if not value: - value = prompt_input(pkginfo, 'Why you are flagging this package?') - data.append(('comments', value)) - - elif action == 'comment': - if not value: - value = prompt_input(pkginfo, 'Enter a comment.') - if value: - data = ( - ('action', 'do_AddComment'), - ('ID', ID), - ('token', self.token), - ('comment', value) + # TODO + # Find way to use this with URL opener. (urlopen accepts a capath arg) + # CA_PATH = '/etc/ssl/certs' + self.opener = urllib.request.build_opener( + urllib.request.HTTPCookieProcessor(self.cookiejar) ) - else: - raise AurtomaticError("no comment submitted") - - elif action == 'setkeywords': - if value is None: - value = prompt_input(pkginfo, 'Enter keywords (or nothing to clear them).') - if value: - if not isinstance(value, str): - try: - value = ' '.join(value) - except TypeError: - value = str(value) - else: - value = '' - data = ( - ('action', 'do_SetKeywords'), - ('token', self.token), - ('keywords', value) - ) - - elif action == 'do_DeleteComment': - if comment_id: - data = ( - ('action', 'do_DeleteComment'), - ('comment_id', comment_id), - ('token', self.token), - ('submit', '1') - ) - else: - raise AurtomaticError('no comment ID submitted for do_DeleteComment') - - else: - raise AurtomaticError('unrecognized form action: {}'.format(action)) - - logging.debug('POSTing data to {}: {}'.format(url, data)) - data = urllib.parse.urlencode(data).encode('UTF-8') - with self.opener.open(url, data) as f: - pass - - - - def search_accounts( - self, - username=None, - typ=None, - suspended=False, - email=None, - realname=None, - ircname=None, - sortby=None, - max_results=ACCOUNT_RESULTS_PER_PAGE, - ): - ''' - Submit a search form and scrape the results. - - Valid types: - u: normal user - t: trusted user - d: developer - td: trusted user & developer - - Valid sortby options: - u: user name - t: account type - r: real name - i: IRC name - ''' - - if suspended: - suspended = 1 - - if not sortby: - sortby = 'u' - - url = AUR.common.AUR_URL + '/accounts/' - data = { - 'Action': 'SearchAccounts', - 'O' : -ACCOUNT_RESULTS_PER_PAGE, - } - # 0 : 50 - for field, value in ( - ('U', username), - ('T', typ), - ('S', suspended), - ('E', email), - ('R', realname), - ('I', ircname), - ('SB', sortby) - ): - if value: - data[field] = value - - headers = None - accounts = list() - - while True: - data['O'] += ACCOUNT_RESULTS_PER_PAGE - encdata = urllib.parse.urlencode(data).encode('UTF-8') - parser = AccountScraper() - with self.opener.open(url, encdata) as f: - parser.feed(f.read().decode()) - if headers is None: - headers = parser.headers.copy() - if parser.accounts: - accounts.extend(parser.accounts) - if max_results > 0 and len(accounts) > max_results: - accounts = accounts[:max_results] - break - elif len(parser.accounts) < ACCOUNT_RESULTS_PER_PAGE: - break - else: - break - - return headers, accounts - - - @XCGF.deprecated - def upload_pkg(self, fpath, confirm=True): - ''' - Upload a package to the AUR. This is no longer supported. - ''' - raise AurtomaticError('Package uploads are no longer supported since AUR 4.0 due to the move to Git repos via SSH. Handling that is better left to user scripts.') - - - - def save_cookies(self, path=None): - ''' - Save cookie jar. - ''' - if path is None: - path = self.cookiejar_path - if path is None: - raise AurtomaticError('save_cookies: no cookiejar path given') - # For Curl compatibility (not sure which one fails to comply with the standard. - for cookie in self.cookiejar: - if not cookie.expires: - cookie.expires = 0 - self.cookiejar.save(path, ignore_discard=True, ignore_expires=True) - - - def load_cookies(self, path=None): - ''' - Load cookie jar. - ''' - if path is None: - path = self.cookiejar_path - if path is None: - raise AurtomaticError('load_cookies: no cookiejar path given') - try: - # For Curl compatibility (not sure which one fails to comply with the standard. - self.cookiejar.load(path, ignore_discard=True, ignore_expires=True) - for cookie in self.cookiejar: - if not cookie.expires: - cookie.expires = None - except http.cookiejar.LoadError: - pass - except IOError as e: - if e.errno != errno.ENOENT: - raise e - - - - def remove_cookies(self, path=None): - ''' - Save cookie jar. - ''' - if path is None: - path = self.cookiejar_path - if path is None: - raise AurtomaticError('remove_cookies: no cookiejar path given') - else: - try: - os.unlink(self.cookiejar_path) - except FileNotFoundError: - pass + self.token = token + + # self.rpc = AUR.RPC.AUR(ttl=0, clean=False) + self.rpc = AUR.RPC.AurRpc() + + def get_info(self, pkgname): + """ + Get package information from the RPC interface. + """ + for pkg in self.rpc.info(pkgname): + return pkg + + def load_token(self): + """ + Attempt to load the hidden token. If the token is empty after this operation + then the user is not currently logged in, so it doubles as a login check. + """ + parser = TokenScraper() + with self.opener.open(TOKENSCRAPER_URL) as f: + parser.feed(f.read().decode()) + if parser.token: + self.token = parser.token + return True + else: + return False + def login(self, user=None, passwd=None, login_file=None, remember_me=True): + """ + Log in to the AUR. + """ + if login_file is not None: + user, passwd = load_login_file(login_file) + if user is None or passwd is None: + logging.info("logging in to the AUR") + if user is None: + user = input("Username: ") - def initialize(self, user=None, passwd=None, login_file=None, cookiejar_path=None): - ''' - Reload token and log in if necessary. - ''' - self.load_cookies(cookiejar_path) - if not self.load_token(): - self.login(user=user, passwd=passwd, login_file=login_file) - if not self.load_token(): - raise AurtomaticError('login appears to have failed\n') - elif cookiejar_path: - self.save_cookies(cookiejar_path) + if passwd is None: + passwd = getpass.getpass() + data = [("user", user), ("passwd", passwd)] + if remember_me: + data.append(("remember_me", "1")) -class CookieWrapper(object): - ACTIONS = ('ask', 'keep', 'remove') - - def __init__(self, path=None, action='ask', login_file=None): - self.action = action - self.login_file=login_file - self.aurtomatic = Aurtomatic(cookiejar_path=path) - - - def __enter__(self): - ''' - Cookie context manager. - ''' - self.aurtomatic.initialize(login_file=self.login_file) - return self.aurtomatic - - - - def __exit__(self, typ, value, traceback): - ''' - Cookie context manager. - ''' - action = self.action - - if action not in ('remove', 'keep'): - cookie_prompts = ( - 'Keep cookie jar? [y/n]', - 'Invalid response. Would you like to keep the cookie jar? [y/n]', - 'Please enter "y" or "n". Would you like to keep the cookie jar? [y/n]', - 'Wtf is wrong with you? Just press "y" or "n". I don\'t even care about the case.', - 'I am not going to ask you again. Do you want to keep the cookie jar or what?' - ) - ans = 'n' - for prompt in cookie_prompts: - ans = input(prompt + ' ').lower() - if ans in 'yn': - break - else: - print('Ok, that\'s it, @#$^ your cookies! Have fun logging in again!') - ans = 'n' - if ans == 'n': - action = 'remove' - else: - action = 'keep' - - if action == 'remove': - self.aurtomatic.remove_cookies() - else: - self.aurtomatic.save_cookies() + data = urllib.parse.urlencode(data).encode("UTF-8") + with self.opener.open(LOGIN_URL, data) as f: + pass + # python3-AUR could be used to cache the data, but sometimes the data must be + # fresh, such as when confirming the upload. + def submit_package_form( + self, + pkginfo, + action, + confirm=False, + merge_into=None, + value=None, + comment=None, + comment_id=None, + ): + """ + Submit a form to the AUR. + """ + + if comment is not None: + XCGF.warn_deprecated( + 'keyword argument "comment" in submit_package_form is deprecated in favor of "value"' + ) + value = comment + + ID = pkginfo["ID"] + url = ACTION_URL + "/{}/".format(pkginfo["PackageBase"]) + + # Perform one of the link-based package actions. + if action in PACKAGE_ACTIONS: + url += action + data = (("token", self.token),) + + elif action in FORM_ACTIONS: + if action in DO_ACTIONS or action == "flag": + ID = pkginfo["PackageBaseID"] + a = FORM_ACTIONS[action] + data = [ + ("IDs[{!s}]".format(ID), "1"), + ("ID", ID), + ("token", self.token), + ("do_{}".format(a), a), + ] + if confirm: + data.append(("confirm", "1")) + if merge_into: + data.append(("merge_Into", merge_into)) + if action == "flag": + if not value: + value = prompt_input(pkginfo, "Why you are flagging this package?") + data.append(("comments", value)) + + elif action == "comment": + if not value: + value = prompt_input(pkginfo, "Enter a comment.") + if value: + data = ( + ("action", "do_AddComment"), + ("ID", ID), + ("token", self.token), + ("comment", value), + ) + else: + raise AurtomaticError("no comment submitted") + + elif action == "setkeywords": + if value is None: + value = prompt_input( + pkginfo, "Enter keywords (or nothing to clear them)." + ) + if value: + if not isinstance(value, str): + try: + value = " ".join(value) + except TypeError: + value = str(value) + else: + value = "" + data = ( + ("action", "do_SetKeywords"), + ("token", self.token), + ("keywords", value), + ) + + elif action == "do_DeleteComment": + if comment_id: + data = ( + ("action", "do_DeleteComment"), + ("comment_id", comment_id), + ("token", self.token), + ("submit", "1"), + ) + else: + raise AurtomaticError("no comment ID submitted for do_DeleteComment") -##################################### Main ##################################### + else: + raise AurtomaticError("unrecognized form action: {}".format(action)) + + logging.debug("POSTing data to {}: {}".format(url, data)) + data = urllib.parse.urlencode(data).encode("UTF-8") + with self.opener.open(url, data) as f: + pass + + def search_accounts( + self, + username=None, + typ=None, + suspended=False, + email=None, + realname=None, + ircname=None, + sortby=None, + max_results=ACCOUNT_RESULTS_PER_PAGE, + ): + """ + Submit a search form and scrape the results. + + Valid types: + u: normal user + t: trusted user + d: developer + td: trusted user & developer + + Valid sortby options: + u: user name + t: account type + r: real name + i: IRC name + """ + + if suspended: + suspended = 1 + + if not sortby: + sortby = "u" + + url = AUR.common.AUR_URL + "/accounts/" + data = { + "Action": "SearchAccounts", + "O": -ACCOUNT_RESULTS_PER_PAGE, + } + # 0 : 50 + for field, value in ( + ("U", username), + ("T", typ), + ("S", suspended), + ("E", email), + ("R", realname), + ("I", ircname), + ("SB", sortby), + ): + if value: + data[field] = value + + headers = None + accounts = list() + + while True: + data["O"] += ACCOUNT_RESULTS_PER_PAGE + encdata = urllib.parse.urlencode(data).encode("UTF-8") + parser = AccountScraper() + with self.opener.open(url, encdata) as f: + parser.feed(f.read().decode()) + if headers is None: + headers = parser.headers.copy() + if parser.accounts: + accounts.extend(parser.accounts) + if max_results > 0 and len(accounts) > max_results: + accounts = accounts[:max_results] + break + elif len(parser.accounts) < ACCOUNT_RESULTS_PER_PAGE: + break + else: + break + + return headers, accounts + + @XCGF.deprecated + def upload_pkg(self, fpath, confirm=True): + """ + Upload a package to the AUR. This is no longer supported. + """ + raise AurtomaticError( + "Package uploads are no longer supported since AUR 4.0 due to the move to Git repos via SSH. Handling that is better left to user scripts." + ) -def parse_args(args=None): - parser = argparse.ArgumentParser(description='Upload packages to the AUR.') - parser.add_argument( - 'paths', metavar='', nargs='*', - help='Arguments are either paths to source archives created with "makepkg --source", or to directories containing such source archives. Simple pattern matching is used to search for "*.src.*". If no paths are given then the current directory is searched.' - ) - parser.add_argument( - '-c', '--cookiejar', metavar='', - help='Specify the path of the cookie jar. The file follows the Netscape format.' - ) - parser.add_argument( - '--comment', action='store_true', - help='Prompt for a comment for each uploaded package. This option requires that the EDITOR environment variable be set.' - ) - parser.add_argument( - '-k', '--keep-cookiejar', dest='keep', action='store_true', - help='Keep the cookie jar.' - ) - parser.add_argument( - '-l', '--login', metavar='', - help='Read name and password from a file. The first line should contain the name and the second the password.' - ) - parser.add_argument( - '-m', '--message', metavar='', - help='Post a message as a comment. The same message will be used for all packages. Use the --comment option to set per-package comments when uploading multiple packages..' - ) - parser.add_argument( - '-n', '--notify', action='store_true', - help='Receive notifications for each uploaded package.' - ) - parser.add_argument( - '-r', '--remove-cookiejar', dest='remove', action='store_true', - help='Remove the cookie jar.' - ) - parser.add_argument( - '-v', '--vote', action='store_true', - help='Vote for each uploaded package.' - ) - return parser.parse_args() + def save_cookies(self, path=None): + """ + Save cookie jar. + """ + if path is None: + path = self.cookiejar_path + if path is None: + raise AurtomaticError("save_cookies: no cookiejar path given") + # For Curl compatibility (not sure which one fails to comply with the standard. + for cookie in self.cookiejar: + if not cookie.expires: + cookie.expires = 0 + self.cookiejar.save(path, ignore_discard=True, ignore_expires=True) + + def load_cookies(self, path=None): + """ + Load cookie jar. + """ + if path is None: + path = self.cookiejar_path + if path is None: + raise AurtomaticError("load_cookies: no cookiejar path given") + try: + # For Curl compatibility (not sure which one fails to comply with the standard. + self.cookiejar.load(path, ignore_discard=True, ignore_expires=True) + for cookie in self.cookiejar: + if not cookie.expires: + cookie.expires = None + except http.cookiejar.LoadError: + pass + except IOError as e: + if e.errno != errno.ENOENT: + raise e + + def remove_cookies(self, path=None): + """ + Save cookie jar. + """ + if path is None: + path = self.cookiejar_path + if path is None: + raise AurtomaticError("remove_cookies: no cookiejar path given") + else: + try: + os.unlink(self.cookiejar_path) + except FileNotFoundError: + pass + + def initialize(self, user=None, passwd=None, login_file=None, cookiejar_path=None): + """ + Reload token and log in if necessary. + """ + self.load_cookies(cookiejar_path) + if not self.load_token(): + self.login(user=user, passwd=passwd, login_file=login_file) + if not self.load_token(): + raise AurtomaticError("login appears to have failed\n") + elif cookiejar_path: + self.save_cookies(cookiejar_path) +class CookieWrapper(object): + ACTIONS = ("ask", "keep", "remove") + + def __init__(self, path=None, action="ask", login_file=None): + self.action = action + self.login_file = login_file + self.aurtomatic = Aurtomatic(cookiejar_path=path) + + def __enter__(self): + """ + Cookie context manager. + """ + self.aurtomatic.initialize(login_file=self.login_file) + return self.aurtomatic + + def __exit__(self, typ, value, traceback): + """ + Cookie context manager. + """ + action = self.action + + if action not in ("remove", "keep"): + cookie_prompts = ( + "Keep cookie jar? [y/n]", + "Invalid response. Would you like to keep the cookie jar? [y/n]", + 'Please enter "y" or "n". Would you like to keep the cookie jar? [y/n]', + 'Wtf is wrong with you? Just press "y" or "n". I don\'t even care about the case.', + "I am not going to ask you again. Do you want to keep the cookie jar or what?", + ) + ans = "n" + for prompt in cookie_prompts: + ans = input(prompt + " ").lower() + if ans in "yn": + break + else: + print("Ok, that's it, @#$^ your cookies! Have fun logging in again!") + ans = "n" + if ans == "n": + action = "remove" + else: + action = "keep" + + if action == "remove": + self.aurtomatic.remove_cookies() + else: + self.aurtomatic.save_cookies() +##################################### Main ##################################### +def parse_args(args=None): + parser = argparse.ArgumentParser(description="Upload packages to the AUR.") + parser.add_argument( + "paths", + metavar="", + nargs="*", + help='Arguments are either paths to source archives created with "makepkg --source", or to directories containing such source archives. Simple pattern matching is used to search for "*.src.*". If no paths are given then the current directory is searched.', + ) + parser.add_argument( + "-c", + "--cookiejar", + metavar="", + help="Specify the path of the cookie jar. The file follows the Netscape format.", + ) + parser.add_argument( + "--comment", + action="store_true", + help="Prompt for a comment for each uploaded package. This option requires that the EDITOR environment variable be set.", + ) + parser.add_argument( + "-k", + "--keep-cookiejar", + dest="keep", + action="store_true", + help="Keep the cookie jar.", + ) + parser.add_argument( + "-l", + "--login", + metavar="", + help="Read name and password from a file. The first line should contain the name and the second the password.", + ) + parser.add_argument( + "-m", + "--message", + metavar="", + help="Post a message as a comment. The same message will be used for all packages. Use the --comment option to set per-package comments when uploading multiple packages..", + ) + parser.add_argument( + "-n", + "--notify", + action="store_true", + help="Receive notifications for each uploaded package.", + ) + parser.add_argument( + "-r", + "--remove-cookiejar", + dest="remove", + action="store_true", + help="Remove the cookie jar.", + ) + parser.add_argument( + "-v", "--vote", action="store_true", help="Vote for each uploaded package." + ) + return parser.parse_args() def main(args=None): - pargs = parse_args(args) - - # Search current directory for source archives if none were specified. This - # allows e.g. "makepkg --source; aurploader" without explicit arguments. - if not pargs.paths: - pkgs = glob.glob('*.src.*') - else: - pkgs = [] - for path in pargs.paths: - if os.path.isdir(path): - ps = glob.glob(os.path.join(path, '*.src.*')) - if ps: - pkgs.extend(ps) - else: - raise AurtomaticError('no source package found in directory ({})'.format(path)) - else: - pkgs.append(path) - - if pargs.remove: - action = 'remove' - elif pargs.keep: - action = 'keep' - else: - action = 'ask' - - with CookieWrapper(path=pargs.cookiejar, action=action, login_file=pargs.login) as aurtomatic: - for pkg in pkgs: - print('Uploading {}'.format(pkg)) - pkginfo = aurtomatic.upload_pkg( - pkg, - confirm=True, - ignore_missing_aurinfo=pargs.ignore_missing_aurinfo, - ) - if pkginfo: - if pargs.vote: - aurtomatic.submit_package_form(pkginfo, 'vote') - if pargs.notify: - aurtomatic.submit_package_form(pkginfo, 'notify') - comment = None - if pargs.comment: - comment = prompt_comment(pkginfo) - elif pargs.message: - comment = pargs.message - if comment: - aurtomatic.submit_package_form(pkginfo, 'comment', comment=comment) - print() + pargs = parse_args(args) + # Search current directory for source archives if none were specified. This + # allows e.g. "makepkg --source; aurploader" without explicit arguments. + if not pargs.paths: + pkgs = glob.glob("*.src.*") + else: + pkgs = [] + for path in pargs.paths: + if os.path.isdir(path): + ps = glob.glob(os.path.join(path, "*.src.*")) + if ps: + pkgs.extend(ps) + else: + raise AurtomaticError( + "no source package found in directory ({})".format(path) + ) + else: + pkgs.append(path) + + if pargs.remove: + action = "remove" + elif pargs.keep: + action = "keep" + else: + action = "ask" + + with CookieWrapper( + path=pargs.cookiejar, action=action, login_file=pargs.login + ) as aurtomatic: + for pkg in pkgs: + print("Uploading {}".format(pkg)) + pkginfo = aurtomatic.upload_pkg( + pkg, + confirm=True, + ignore_missing_aurinfo=pargs.ignore_missing_aurinfo, + ) + if pkginfo: + if pargs.vote: + aurtomatic.submit_package_form(pkginfo, "vote") + if pargs.notify: + aurtomatic.submit_package_form(pkginfo, "notify") + comment = None + if pargs.comment: + comment = prompt_comment(pkginfo) + elif pargs.message: + comment = pargs.message + if comment: + aurtomatic.submit_package_form(pkginfo, "comment", comment=comment) + print() def run_main(args=None): - ''' - Run main() with exception handling. - ''' - try: - main(args) - except (KeyboardInterrupt, BrokenPipeError): - pass - except AurtomaticError as e: - sys.exit('error: {}\n'.format(e.msg)) - except urllib.error.URLError as e: - sys.exit('URLError: {}\n'.format(e)) - - -if __name__ == '__main__': - run_main() + """ + Run main() with exception handling. + """ + try: + main(args) + except (KeyboardInterrupt, BrokenPipeError): + pass + except AurtomaticError as e: + sys.exit("error: {}\n".format(e.msg)) + except urllib.error.URLError as e: + sys.exit("URLError: {}\n".format(e)) + + +if __name__ == "__main__": + run_main() diff --git a/AUR/PkgList.py b/AUR/PkgList.py index 6810fe2..1255859 100644 --- a/AUR/PkgList.py +++ b/AUR/PkgList.py @@ -33,81 +33,80 @@ ################################## Constants ################################### -PKGLIST_PATH = '/packages.gz' +PKGLIST_PATH = "/packages.gz" PKGLIST_URL = AUR.common.AUR_URL + PKGLIST_PATH - ################################## Functions ################################### -def iterate_packages(path): - with gzip.open(path, 'rt') as f: - for line in f: - if line.startswith('#'): - continue - else: - yield line.strip() +def iterate_packages(path): + with gzip.open(path, "rt") as f: + for line in f: + if line.startswith("#"): + continue + else: + yield line.strip() ################################### Classes #################################### + class PkgList(object): - ''' - A class to retrieve and iterate over the list of AUR packages. - ''' - - def __init__(self, path=None, ttl=AUR.common.DEFAULT_TTL, auto_refresh=False): - ''' - path: - The local path under which to store the file. - - ttl: - The time-to-live of the cached file. This is passed to XCGF.mirror as the - cache_time option. - - auto_refresh: - If True, automatically refresh the file when needed. - ''' - if not path: - cache_dir = xdg.BaseDirectory.save_cache_path(AUR.common.XDG_NAME) - path = os.path.join(cache_dir, os.path.basename(PKGLIST_PATH)) - self.path = path - self.ttl = ttl - self.auto_refresh = auto_refresh - try: - self.last_refresh = os.path.getmtime(self.path) - except FileNotFoundError: - self.last_refresh = None - - - - def refresh(self, force=False): - if force: - ttl = 0 - else: - ttl = self.ttl - with XCGF.Lockfile(self.path + '.lck', 'PkgList') as p: - XCGF.mirror(PKGLIST_URL, self.path, cache_time=ttl) - self.last_refresh = time.time() - - - - def __iter__(self): - if self.auto_refresh: - # Refresh the list if it hasn't been refreshed yet - if self.last_refresh is None \ - or time.time() - self.last_refresh > (self.ttl if self.ttl > 0 else AUR.common.DEFAULT_TTL): - self.refresh() - try: - for p in iterate_packages(self.path): - yield p - except FileNotFoundError: - if self.auto_refresh: - logging.warning('previous PkgList auto-refresh failed, attempting to force a refresh') - self.refresh(force=True) + """ + A class to retrieve and iterate over the list of AUR packages. + """ + + def __init__(self, path=None, ttl=AUR.common.DEFAULT_TTL, auto_refresh=False): + """ + path: + The local path under which to store the file. + + ttl: + The time-to-live of the cached file. This is passed to XCGF.mirror as the + cache_time option. + + auto_refresh: + If True, automatically refresh the file when needed. + """ + if not path: + cache_dir = xdg.BaseDirectory.save_cache_path(AUR.common.XDG_NAME) + path = os.path.join(cache_dir, os.path.basename(PKGLIST_PATH)) + self.path = path + self.ttl = ttl + self.auto_refresh = auto_refresh + try: + self.last_refresh = os.path.getmtime(self.path) + except FileNotFoundError: + self.last_refresh = None + + def refresh(self, force=False): + if force: + ttl = 0 + else: + ttl = self.ttl + with XCGF.Lockfile(self.path + ".lck", "PkgList") as p: + XCGF.mirror(PKGLIST_URL, self.path, cache_time=ttl) + self.last_refresh = time.time() + + def __iter__(self): + if self.auto_refresh: + # Refresh the list if it hasn't been refreshed yet + if self.last_refresh is None or time.time() - self.last_refresh > ( + self.ttl if self.ttl > 0 else AUR.common.DEFAULT_TTL + ): + self.refresh() try: - for p in iterate_packages(self.path): - yield p + for p in iterate_packages(self.path): + yield p except FileNotFoundError: - pass + if self.auto_refresh: + logging.warning( + "previous PkgList auto-refresh failed, attempting to force a refresh" + ) + self.refresh(force=True) + try: + for p in iterate_packages(self.path): + yield p + except FileNotFoundError: + pass diff --git a/AUR/RPC.py b/AUR/RPC.py index af78b8c..a7d2545 100644 --- a/AUR/RPC.py +++ b/AUR/RPC.py @@ -17,7 +17,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -''' +""" Retrieve data from the AUR via the RPC interface. Results are cached in an SQLite3 database to avoid redundant queries when @@ -25,7 +25,7 @@ For more information see https://aur.archlinux.org/rpc.php and https://projects.archlinux.org/aurweb.git/plain/doc/rpc.txt -''' +""" import argparse import json @@ -55,115 +55,114 @@ # --------------------------------- Globals ---------------------------------- # -RPC_URL = AUR.common.AUR_URL + '/rpc.php' +RPC_URL = AUR.common.AUR_URL + "/rpc.php" RPC_VERSION = 5 RPC_MAX_ARGS = 500 -CODING = 'UTF-8' +CODING = "UTF-8" # Valid parameters for the chosen version of the RPC interface. -RPC_TYPES = ('info', 'search') -RPC_BYS = ('name', 'name-desc', 'maintainer') +RPC_TYPES = ("info", "search") +RPC_BYS = ("name", "name-desc", "maintainer") RPC_DEFAULT_BY = RPC_BYS[1] INTEGER_FIELDS = ( - 'FirstSubmitted', - 'ID', - 'LastModified', - 'NumVotes', - 'OutOfDate', - 'PackageBaseID', + "FirstSubmitted", + "ID", + "LastModified", + "NumVotes", + "OutOfDate", + "PackageBaseID", ) LIST_FIELDS = ( - 'CheckDepends', - 'Conflicts', - 'Depends', - 'Groups', - 'Keywords', - 'License', - 'MakeDepends', - 'OptDepends', - 'Provides', - 'Replaces', - 'Source', + "CheckDepends", + "Conflicts", + "Depends", + "Groups", + "Keywords", + "License", + "MakeDepends", + "OptDepends", + "Provides", + "Replaces", + "Source", ) URL_FIELDS = ( - 'URL', - 'AURPage', - 'URLPath', + "URL", + "AURPage", + "URLPath", ) DISPLAY_FIELDS = ( - 'PackageBase', - 'Name', - 'Version', - 'Description', - 'URL', - 'URLPath', - 'Maintainer', - 'Depends', - 'MakeDepends', - 'CheckDepends', - 'OptDepends', - 'Conflicts', - 'Provides', - 'Replaces', - 'Groups', - 'License', - 'NumVotes', - 'FirstSubmitted', - 'LastModified', - 'LastPackager', - 'OutOfDate', - 'ID', - 'PackageBaseID', - 'Keywords' + "PackageBase", + "Name", + "Version", + "Description", + "URL", + "URLPath", + "Maintainer", + "Depends", + "MakeDepends", + "CheckDepends", + "OptDepends", + "Conflicts", + "Provides", + "Replaces", + "Groups", + "License", + "NumVotes", + "FirstSubmitted", + "LastModified", + "LastPackager", + "OutOfDate", + "ID", + "PackageBaseID", + "Keywords", ) # ----------------------------- List Formatting ------------------------------ # + def lst_to_txt(lst): - ''' + """ Prepare a list for storage in a text field. - ''' - return '\n'.join(lst) + """ + return "\n".join(lst) def txt_to_lst(txt): - ''' + """ Convert a textified list back to a list. - ''' + """ if txt: - return txt.split('\n') + return txt.split("\n") else: return list() # --------------------------------- RPC URL ---------------------------------- # + def rpc_url(typ, args, by=RPC_DEFAULT_BY, post=False): - ''' + """ Format the RPC URL. - ''' - qs = [ - ('v', RPC_VERSION), - ('type', typ) - ] - if typ == 'info': - param = 'arg[]' - elif typ == 'search': - param = 'arg' - qs.append(('by', by)) + """ + qs = [("v", RPC_VERSION), ("type", typ)] + if typ == "info": + param = "arg[]" + elif typ == "search": + param = "arg" + qs.append(("by", by)) else: - param = 'arg' + param = "arg" qs.extend((param, a) for a in args) qs_str = urllib.parse.urlencode(qs) if post: return RPC_URL, qs_str.encode(CODING) else: - return '{}?{}'.format(RPC_URL, qs_str) + return "{}?{}".format(RPC_URL, qs_str) # ------------------------------ HTML Scraping ------------------------------- # @@ -171,10 +170,12 @@ def rpc_url(typ, args, by=RPC_DEFAULT_BY, post=False): # This is a temporary page scraper to get the last packager which is currently # omitted from the RPC info. + class LastPackagerParser(HTMLParser): - ''' + """ Parse the last packager from the AUR package page. - ''' + """ + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.in_pkginfo = False @@ -183,61 +184,62 @@ def __init__(self, *args, **kwargs): self.last_packager = None def handle_starttag(self, tag, attrs): - if tag == 'table' and ('id', 'pkginfo') in attrs: + if tag == "table" and ("id", "pkginfo") in attrs: self.in_pkginfo = True elif not self.in_pkginfo: return - if tag == 'th': + if tag == "th": self.in_th = True - elif self.in_last_packager and tag == 'a': + elif self.in_last_packager and tag == "a": self.in_last_packager = True def handle_endtag(self, tag): self.in_th = False - if tag == 'table': + if tag == "table": self.in_pkginfo = False - if tag == 'td': + if tag == "td": self.in_last_packager = False def handle_data(self, data): - if self.in_th and data.strip() == 'Last Packager:': + if self.in_th and data.strip() == "Last Packager:": self.in_last_packager = True elif self.in_last_packager: self.last_packager = data.strip() def add_last_packager(pkg): - ''' + """ Get the last packager by scraping the AUR webpage of the given package. - ''' - url = AUR.common.AUR_URL + f'''/packages/{urllib.parse.quote_plus(pkg['Name'])}''' + """ + url = AUR.common.AUR_URL + f"""/packages/{urllib.parse.quote_plus(pkg['Name'])}""" with urllib.request.urlopen(url) as f: html_code = f.read().decode() parser = LastPackagerParser() parser.feed(html_code) - pkg['LastPackager'] = parser.last_packager + pkg["LastPackager"] = parser.last_packager # ----------------------------------- AUR ------------------------------------ # + def insert_full_urls(pkgs): - ''' + """ Replace partial URLS with full URLS for each passed package. - ''' + """ for pkg in pkgs: try: - if not pkg['URLPath'].startswith(AUR.common.AUR_URL): - pkg['URLPath'] = AUR.common.AUR_URL + pkg['URLPath'] + if not pkg["URLPath"].startswith(AUR.common.AUR_URL): + pkg["URLPath"] = AUR.common.AUR_URL + pkg["URLPath"] except (KeyError, TypeError): pass yield pkg def convert_pkginfo(pkg): - ''' + """ Convert package info fields to expected formats. - ''' + """ for key in INTEGER_FIELDS: try: @@ -247,40 +249,40 @@ def convert_pkginfo(pkg): except KeyError: pass except TypeError: - logging.error('failed to convert {} to integer ({})'.format(key, pkg[key])) -# for key in LIST_FIELDS: -# if key not in pkg: -# pkg[key] = list() + logging.error("failed to convert {} to integer ({})".format(key, pkg[key])) + # for key in LIST_FIELDS: + # if key not in pkg: + # pkg[key] = list() return pkg def format_rpc_args(args): - ''' + """ Ensure that the arguments are a list. If None, then a list with a single empty string will be returned to ensure that e.g. orphan searches work as expected. - ''' + """ if args is None: - return ('',) + return ("",) elif isinstance(args, str): return (args,) else: - return tuple(a if a is not None else '' for a in args) + return tuple(a if a is not None else "" for a in args) def rpc_info(args): - ''' + """ MemoizeDB glue function for RPC info queries. - ''' - for pkg in aur_query('info', args): - yield pkg['Name'], (json.dumps(pkg),) + """ + for pkg in aur_query("info", args): + yield pkg["Name"], (json.dumps(pkg),) def rpc_search_by(args, by=RPC_DEFAULT_BY): - ''' + """ MemoizeDB glue function for RPC search queries. - ''' + """ for arg in args: - hits = list(aur_query('search', (arg,), by=by)) + hits = list(aur_query("search", (arg,), by=by)) if hits: yield arg, (json.dumps(hits),) else: @@ -288,18 +290,18 @@ def rpc_search_by(args, by=RPC_DEFAULT_BY): def aur_query(typ, args, by=RPC_DEFAULT_BY): - ''' + """ Query the AUR RPC interface. - ''' + """ for r in _aur_query_wrapper(typ, format_rpc_args(args), by=by): yield r def _aur_query_wrapper(typ, args, by=RPC_DEFAULT_BY): - ''' + """ Internal function. This will split long query strings when necessary to retrieve all of the results. - ''' + """ url = rpc_url(typ, args, by=by) try: for r in _aur_query(typ, url): @@ -321,35 +323,36 @@ def _aur_query_wrapper(typ, args, by=RPC_DEFAULT_BY): def _aur_query(typ, url, post_data=None): - ''' + """ Internal function. Iterate over results. - ''' - logging.debug('retrieving {}'.format(url)) + """ + logging.debug("retrieving {}".format(url)) with urllib.request.urlopen(url, data=post_data) as f: response = json.loads(f.read().decode(CODING)) - logging.debug(json.dumps(response, indent=' ', sort_keys=True)) + logging.debug(json.dumps(response, indent=" ", sort_keys=True)) try: - rtyp = response['type'] - if rtyp == typ or (rtyp == 'multiinfo' and typ == 'info'): - if response['resultcount'] == 0: - logging.info('no results found') + rtyp = response["type"] + if rtyp == typ or (rtyp == "multiinfo" and typ == "info"): + if response["resultcount"] == 0: + logging.info("no results found") return - for r in response['results']: + for r in response["results"]: yield r - elif rtyp == 'error': - logging.error('RPC error {}'.format(response['results'])) + elif rtyp == "error": + logging.error("RPC error {}".format(response["results"])) else: - logging.error('Unexpected RPC return type {}'.format(rtyp)) + logging.error("Unexpected RPC return type {}".format(rtyp)) except KeyError: - logging.error('Unexpected RPC error.') + logging.error("Unexpected RPC error.") # ------------------------------ AurError Class ------------------------------ # + class AurError(Exception): - ''' + """ Exception raised by AUR objects. - ''' + """ def __init__(self, msg, error=None): self.msg = msg @@ -358,16 +361,19 @@ def __init__(self, msg, error=None): # -------------------------------- AUR Class --------------------------------- # + class AurRpc(object): - ''' + """ Interact with the Arch Linux User Repository (AUR) Data retrieved via the RPC interface is cached temporarily in an SQLite3 database to avoid unnecessary remote calls. - ''' + """ - def __init__(self, database=None, mdb=None, ttl=AUR.common.DEFAULT_TTL, clean=False): - ''' + def __init__( + self, database=None, mdb=None, ttl=AUR.common.DEFAULT_TTL, clean=False + ): + """ Initialize the AUR object. database: @@ -380,26 +386,29 @@ def __init__(self, database=None, mdb=None, ttl=AUR.common.DEFAULT_TTL, clean=Fa clean: Clean the database to remove old entries and ensure integrity. - ''' + """ if not database: cachedir = xdg.BaseDirectory.save_cache_path(AUR.common.XDG_NAME) - database = os.path.join(cachedir, 'RPC.sqlite3') + database = os.path.join(cachedir, "RPC.sqlite3") if mdb is None: - glue = {'info': (rpc_info, (('data', 'TEXT'),), ttl)} + glue = {"info": (rpc_info, (("data", "TEXT"),), ttl)} for by in RPC_BYS: - table = 'search_by_{}'.format(by.replace('-', '_')) + table = "search_by_{}".format(by.replace("-", "_")) + # by=by is required to bind the function and ensure that all 3 are # different. - def f(xs, by=by): return rpc_search_by(xs, by=by) - glue[table] = (f, (('data', 'TEXT'),), ttl) + def f(xs, by=by): + return rpc_search_by(xs, by=by) + + glue[table] = (f, (("data", "TEXT"),), ttl) conn = sqlite3.connect( database, detect_types=(sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES), - isolation_level=None + isolation_level=None, ) mdb = MemoizeDB.MemoizeDB(conn, glue) mdb.db_initialize() @@ -412,14 +421,14 @@ def f(xs, by=by): return rpc_search_by(xs, by=by) # ------------------------ Accessibility Methods ------------------------ # def get(self, *args, **kwargs): - ''' + """ See the documentation for _get. - ''' + """ for pkg in self._get(*args, **kwargs): yield convert_pkginfo(pkg) def _get(self, typ, args, by=RPC_DEFAULT_BY, intersect=False, last_packager=False): - ''' + """ Get package information from the AUR RPC interface using locally cached data when available and still valid. @@ -435,21 +444,21 @@ def _get(self, typ, args, by=RPC_DEFAULT_BY, intersect=False, last_packager=Fals last_packager: If True, scrape the last packager from the AUR website when returning info. - ''' + """ if args is not None and not args: return - if typ == 'info': - table = 'info' - elif typ == 'search': + if typ == "info": + table = "info" + elif typ == "search": if by in RPC_BYS: - table = 'search_by_{}'.format(by.replace('-', '_')) + table = "search_by_{}".format(by.replace("-", "_")) else: raise AurError('unrecognized "by": {}'.format(by)) else: - raise AurError('unrecognized operation: {}'.format(typ)) + raise AurError("unrecognized operation: {}".format(typ)) - if typ == 'info': + if typ == "info": # Determine the names of packages for which information must be retrieved. # by stripping version requirements from the names. Once the data has been # retrieved, the version requirements will be checked and only those @@ -464,7 +473,7 @@ def _get(self, typ, args, by=RPC_DEFAULT_BY, intersect=False, last_packager=Fals if last_packager: add_last_packager(pkg) if XCPF.satisfies_all_version_requirements( - pkg['Version'], ver_reqs[pkg['Name']] + pkg["Version"], ver_reqs[pkg["Name"]] ): yield pkg @@ -472,186 +481,194 @@ def _get(self, typ, args, by=RPC_DEFAULT_BY, intersect=False, last_packager=Fals if intersect: n = 0 hits = dict() - for results in self.mdb.get_nth_field_many(table, format_rpc_args(args)): + for results in self.mdb.get_nth_field_many( + table, format_rpc_args(args) + ): n += 1 for pkg in json.loads(results): try: - hit = hits[pkg['Name']] + hit = hits[pkg["Name"]] except KeyError: - hits[pkg['Name']] = (1, pkg) + hits[pkg["Name"]] = (1, pkg) else: - hits[pkg['Name']] = (hit[0] + 1, pkg) + hits[pkg["Name"]] = (hit[0] + 1, pkg) for count, pkg in hits.values(): if count == n: yield pkg else: - for results in self.mdb.get_nth_field_many(table, format_rpc_args(args)): + for results in self.mdb.get_nth_field_many( + table, format_rpc_args(args) + ): if results is not None: for pkg in json.loads(results): yield pkg def info(self, *args, **kwargs): - ''' + """ Retrieve package information. - ''' - return self.get('info', *args, **kwargs) + """ + return self.get("info", *args, **kwargs) def search(self, *args, by=RPC_DEFAULT_BY, intersect=False, **kwargs): - ''' + """ Search for packages. - ''' - return self.get('search', *args, by=by, intersect=intersect, **kwargs) + """ + return self.get("search", *args, by=by, intersect=intersect, **kwargs) def msearch(self, *args, **kwargs): - ''' + """ Search for packages by maintainer. Only the names are returned. - ''' - warnings.warn('deprecated', category=DeprecationWarning) - return self.get('search', *args, by='maintainer', **kwargs) + """ + warnings.warn("deprecated", category=DeprecationWarning) + return self.get("search", *args, by="maintainer", **kwargs) # --------------------------------- Download --------------------------------- # + def download_archives(output_dir, pkgs): - ''' + """ Download the AUR files to the target directory. - ''' + """ os.makedirs(output_dir, exist_ok=True) for pkg in insert_full_urls(pkgs): - logging.debug('Retrieving and extracting {} to {}.'.format( - pkg['URLPath'], output_dir) + logging.debug( + "Retrieving and extracting {} to {}.".format(pkg["URLPath"], output_dir) ) - with urllib.request.urlopen(pkg['URLPath']) as f: - logging.debug('extracting {} to {}'.format(pkg['URLPath'], output_dir)) - tarfile.open(mode='r|gz', fileobj=f).extractall(path=output_dir) + with urllib.request.urlopen(pkg["URLPath"]) as f: + logging.debug("extracting {} to {}".format(pkg["URLPath"], output_dir)) + tarfile.open(mode="r|gz", fileobj=f).extractall(path=output_dir) yield pkg def download_git_repo(output_dir, pkgs, warn=False, pull=False): - ''' + """ Download the AUR files to the target directory. - ''' + """ os.makedirs(output_dir, exist_ok=True) for pkg in pkgs: git_url = AUR.common.AUR_GIT_URL_FORMAT.format( - urllib.parse.quote_plus(pkg['PackageBase']) + urllib.parse.quote_plus(pkg["PackageBase"]) ) - repo_dir = os.path.join(output_dir, pkg['PackageBase']) + repo_dir = os.path.join(output_dir, pkg["PackageBase"]) if os.path.isdir(repo_dir): if pull: - cmd = ['git', '-C', repo_dir, 'pull'] + cmd = ["git", "-C", repo_dir, "pull"] else: - cmd = ['git', '-C', repo_dir, 'fetch'] + cmd = ["git", "-C", repo_dir, "fetch"] else: - cmd = ['git', 'clone', git_url, repo_dir] - logging.debug('running {}'.format(XCGF.sh_quote_words(cmd))) + cmd = ["git", "clone", git_url, repo_dir] + logging.debug("running {}".format(XCGF.sh_quote_words(cmd))) try: subprocess.run(cmd, stderr=subprocess.PIPE, check=True) - if not os.path.exists(os.path.join(repo_dir, 'PKGBUILD')): + if not os.path.exists(os.path.join(repo_dir, "PKGBUILD")): shutil.rmtree(repo_dir) - raise RuntimeError('probably no repo') - with open(os.path.join(repo_dir, '.SRCINFO'), 'r') as handle: - pkg['Names'] = list( - line.split('=')[1].strip() - for line in handle if line.startswith('pkgname =') + raise RuntimeError("probably no repo") + with open(os.path.join(repo_dir, ".SRCINFO"), "r") as handle: + pkg["Names"] = list( + line.split("=")[1].strip() + for line in handle + if line.startswith("pkgname =") ) yield pkg except (subprocess.CalledProcessError, RuntimeError) as e: f = logging.warn if warn else logging.error - f('failed to clone or fetch {} to {} [{}]'.format(git_url, repo_dir, e) - ) + f("failed to clone or fetch {} to {} [{}]".format(git_url, repo_dir, e)) # ----------------------------- User Interaction ----------------------------- # + def parse_args(args=None): - ''' + """ Parse command-line arguments. If no arguments are passed then arguments are read from sys.argv. - ''' + """ parser = argparse.ArgumentParser( - description='Query the AUR RPC interface.', - epilog='For maintainer searches, use an empty string (\'\') as an argument to search for orphans.' - ) - parser.add_argument( - 'args', metavar='', nargs='+' + description="Query the AUR RPC interface.", + epilog="For maintainer searches, use an empty string ('') as an argument to search for orphans.", ) + parser.add_argument("args", metavar="", nargs="+") parser.add_argument( - '-i', '--info', action='store_true', - help='Query package information.' + "-i", "--info", action="store_true", help="Query package information." ) + parser.add_argument("-s", "--search", action="store_true", help="Search the AUR.") parser.add_argument( - '-s', '--search', action='store_true', - help='Search the AUR.' + "--by", + choices=RPC_BYS, + default=RPC_DEFAULT_BY, + help="By which fields to search. Default: %(default)s", ) + parser.add_argument("--debug", action="store_true", help="Enable debugging.") parser.add_argument( - '--by', choices=RPC_BYS, default=RPC_DEFAULT_BY, - help='By which fields to search. Default: %(default)s' + "--log", metavar="", help="Log debugging information to ." ) parser.add_argument( - '--debug', action='store_true', - help='Enable debugging.' + "--ttl", + metavar="", + type=int, + default=(AUR.common.DEFAULT_TTL // 60), + help="Time-to-live of cached data (default: %(default)s)", ) parser.add_argument( - '--log', metavar='', - help='Log debugging information to .' + "--full-info", + action="store_true", + help="Return full information for searches and msearches.", ) parser.add_argument( - '--ttl', metavar='', type=int, default=(AUR.common.DEFAULT_TTL // 60), - help='Time-to-live of cached data (default: %(default)s)' - ) - parser.add_argument( - '--full-info', action='store_true', - help='Return full information for searches and msearches.' - ) - parser.add_argument( - '--intersect', action='store_true', - help='When searching for packages, only return results that match all search terms.' + "--intersect", + action="store_true", + help="When searching for packages, only return results that match all search terms.", ) return parser.parse_args(args) def format_pkginfo(pkgs): - ''' + """ Format package information for display similarly to "pacman -Si". This function modifies the passed packages. - ''' + """ fields = list(DISPLAY_FIELDS) - fields.insert(0, 'Repository') - fields.insert(6, 'AURPage') + fields.insert(0, "Repository") + fields.insert(6, "AURPage") # +1 for space, +1 for _ separator for arch-specific fields - w = max(map(len, fields)) + 2 + \ - max(map(len, XCPF.ARCHLINUX_OFFICIAL_ARCHITECTURES)) - fmt = '{{:<{:d}s}}: {{!s:=")'.format(line)) - continue - - if line[0] == '\t': - try: - obj[k].append(v) - except KeyError: - obj[k] = [v] - - else: - obj = dict() - if k == PKGBASE_STRING: - srcinfo[PKGBASE_STRING] = (v, obj) - else: try: - srcinfo[k][v] = obj - except KeyError: - srcinfo[k] = {v : obj} - - logging.debug(json.dumps(srcinfo, indent=' ', sort_keys=True)) - return srcinfo + k, v = stripped_line.split(" = ", 1) + except ValueError: + logging.warning( + 'unexpected line while parsing SRCINFO: "{}" (expected format: "=")'.format( + line + ) + ) + continue + + if line[0] == "\t": + try: + obj[k].append(v) + except KeyError: + obj[k] = [v] + + else: + obj = dict() + if k == PKGBASE_STRING: + srcinfo[PKGBASE_STRING] = (v, obj) + else: + try: + srcinfo[k][v] = obj + except KeyError: + srcinfo[k] = {v: obj} + logging.debug(json.dumps(srcinfo, indent=" ", sort_keys=True)) + return srcinfo def read_srcinfo_file(path): - ''' - Read a .SRCINFO file. - ''' - with open(path, 'r') as f: - return parse_srcinfo(f) - + """ + Read a .SRCINFO file. + """ + with open(path, "r") as f: + return parse_srcinfo(f) def read_srcinfo_url(url): - ''' - Read a .SRCINFO URL. - ''' - text = XCGF.text_from_url(url) - return parse_srcinfo(text.split('\n')) - + """ + Read a .SRCINFO URL. + """ + text = XCGF.text_from_url(url) + return parse_srcinfo(text.split("\n")) def get_pkginfo(srcinfo, pkgname): - ''' - Return package information for one of the packages within the SRCINFO. - ''' - try: - pkginfo = srcinfo[PKGNAME_STRING][pkgname].copy() + """ + Return package information for one of the packages within the SRCINFO. + """ try: - pkgbase = srcinfo[PKGBASE_STRING][0] - pkginfo = insert_pkgbase(pkginfo, srcinfo[PKGBASE_STRING][1]) + pkginfo = srcinfo[PKGNAME_STRING][pkgname].copy() + try: + pkgbase = srcinfo[PKGBASE_STRING][0] + pkginfo = insert_pkgbase(pkginfo, srcinfo[PKGBASE_STRING][1]) + except KeyError: + pkgbase = pkgname + pass except KeyError: - pkgbase = pkgname - pass - except KeyError: - return None - pkginfo[PKGBASE_STRING] = pkgbase - pkginfo[PKGNAME_STRING] = pkgname - return pkginfo - - + return None + pkginfo[PKGBASE_STRING] = pkgbase + pkginfo[PKGNAME_STRING] = pkgname + return pkginfo ################################## AurSrcInfo ################################## + class AurSrcinfo(object): - ''' - A caching AUR .SRCINFO retriever. - ''' - SRCINFO_TABLE = 'srcinfo' - def __init__(self, mdb=None, dbpath=None, ttl=AUR.common.DEFAULT_TTL): - if mdb is None: - if dbpath is None: - dbpath = srcinfo_dbpath() - def f(pkgnames): - for pkgname in pkgnames: - url = srcinfo_url(pkgname) - try: - yield pkgname, (XCGF.text_from_url(url),) - except urllib.error.HTTPError as e: - if e.code == 404: - yield pkgname, None + """ + A caching AUR .SRCINFO retriever. + """ + + SRCINFO_TABLE = "srcinfo" + + def __init__(self, mdb=None, dbpath=None, ttl=AUR.common.DEFAULT_TTL): + if mdb is None: + if dbpath is None: + dbpath = srcinfo_dbpath() + + def f(pkgnames): + for pkgname in pkgnames: + url = srcinfo_url(pkgname) + try: + yield pkgname, (XCGF.text_from_url(url),) + except urllib.error.HTTPError as e: + if e.code == 404: + yield pkgname, None + else: + raise e + + glue = {self.SRCINFO_TABLE: (f, (("text", "TEXT"),), ttl)} + conn = sqlite3.connect( + dbpath, + detect_types=(sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES), + isolation_level=None, + ) + mdb = MemoizeDB.MemoizeDB(conn, glue) + mdb.db_initialize() + self.mdb = mdb + + def get(self, args): + """ + Iterate over the parse .SRCINFO files. Returns None if the file could not + be retrieved. This expects PackageBase arguments. + """ + for srcinfo in self.mdb.get_nth_field_many(self.SRCINFO_TABLE, args): + if srcinfo: + yield parse_srcinfo(srcinfo.split("\n")) else: - raise e - glue = { - self.SRCINFO_TABLE : (f, (('text', 'TEXT'),), ttl) - } - conn = sqlite3.connect( - dbpath, - detect_types=(sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES), - isolation_level=None - ) - mdb = MemoizeDB.MemoizeDB(conn, glue) - mdb.db_initialize() - self.mdb = mdb - - - def get(self, args): - ''' - Iterate over the parse .SRCINFO files. Returns None if the file could not - be retrieved. This expects PackageBase arguments. - ''' - for srcinfo in self.mdb.get_nth_field_many(self.SRCINFO_TABLE, args): - if srcinfo: - yield parse_srcinfo(srcinfo.split('\n')) - else: - yield None - - - def get_pkginfo(self, pkgbases_and_pkgnames): - ''' - Retrieve package information. - - pkgbases_and_pkgnames: An iterator over package base and package name pairs. - ''' - pkgbases, pkgnames = zip(*pkgbases_and_pkgnames) - for pkgname, srcinfo in zip(pkgnames, self.get(pkgbases)): - if srcinfo: - yield get_pkginfo(srcinfo, pkgname) - else: - yield None - + yield None + + def get_pkginfo(self, pkgbases_and_pkgnames): + """ + Retrieve package information. + + pkgbases_and_pkgnames: An iterator over package base and package name pairs. + """ + pkgbases, pkgnames = zip(*pkgbases_and_pkgnames) + for pkgname, srcinfo in zip(pkgnames, self.get(pkgbases)): + if srcinfo: + yield get_pkginfo(srcinfo, pkgname) + else: + yield None ##################################### Main ##################################### + def main(args=None): - argparser = argparse.ArgumentParser( - description='Retrieve AUR .SRCINFO files and display them as JSON.' - ) - argparser.add_argument( - 'pkgname', nargs='+' - ) - pargs = argparser.parse_args(args) - a = AurSrcinfo() - json.dump( - tuple(a.get_pkginfo(pargs.pkgname)), - sys.stdout, - indent=' ', - sort_keys=True - ) - - -if __name__ == '__main__': - try: - main() - except KeyboardInterrupt: - pass + argparser = argparse.ArgumentParser( + description="Retrieve AUR .SRCINFO files and display them as JSON." + ) + argparser.add_argument("pkgname", nargs="+") + pargs = argparser.parse_args(args) + a = AurSrcinfo() + json.dump( + tuple(a.get_pkginfo(pargs.pkgname)), sys.stdout, indent=" ", sort_keys=True + ) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + pass diff --git a/AUR/common.py b/AUR/common.py index 058e604..647ba65 100644 --- a/AUR/common.py +++ b/AUR/common.py @@ -19,11 +19,11 @@ ################################## Constants ################################### -XDG_NAME = 'AUR' -AUR_HOST = 'aur.archlinux.org' -AUR_URL = 'https://' + AUR_HOST +XDG_NAME = "AUR" +AUR_HOST = "aur.archlinux.org" +AUR_URL = "https://" + AUR_HOST # AUR_URL = 'https://aur-dev.archlinux.org' -AUR_GIT_URL_FORMAT = AUR_URL + '/{}.git' -AUR_SSH_GIT_URL = 'ssh+git://aur@' + AUR_HOST +AUR_GIT_URL_FORMAT = AUR_URL + "/{}.git" +AUR_SSH_GIT_URL = "ssh+git://aur@" + AUR_HOST DEFAULT_TTL = 15 * 60 diff --git a/examples/comment.py b/examples/comment.py index 90e656d..2c9e4b4 100755 --- a/examples/comment.py +++ b/examples/comment.py @@ -6,16 +6,16 @@ # base of what later became aurtomatic. if argv[1:]: - aurt = Aurtomatic() - aurt.initialize() + aurt = Aurtomatic() + aurt.initialize() - for pkgname in argv[1:]: - pkginfo = aurt.get_info(pkgname) - print("posting comment for {}".format(pkginfo['Name'])) - try: - comment = prompt_comment(pkginfo) - except AurtomaticError as e: - stderr.write(str(e)) - exit(1) - if comment: - aurt.submit_package_form(pkginfo, 'comment', comment=comment) + for pkgname in argv[1:]: + pkginfo = aurt.get_info(pkgname) + print("posting comment for {}".format(pkginfo["Name"])) + try: + comment = prompt_comment(pkginfo) + except AurtomaticError as e: + stderr.write(str(e)) + exit(1) + if comment: + aurt.submit_package_form(pkginfo, "comment", comment=comment) diff --git a/examples/msearch.py b/examples/msearch.py index 4f3604b..82e4a37 100755 --- a/examples/msearch.py +++ b/examples/msearch.py @@ -3,41 +3,45 @@ import AUR.RPC as AUR from sys import argv, exit + def main(): - maintainers = argv[1:] - if not maintainers: - print("usage: {0} [, ...]\n\n e.g. {0} Xyne".format(argv[0])) - exit(1) - - for maintainer in maintainers: - print("{}'s packages, sorted by votes:".format(maintainer)) - aur = AUR.AUR() - pkgs = list(aur.search(maintainer, by='maintainer')) - if pkgs: - pkgs.sort(key=lambda p: p['NumVotes'], reverse=True) - left_width = max( [len(p['Name']) for p in pkgs]) - total_votes = sum(p['NumVotes'] for p in pkgs) - totals = ( - ('packages', len(pkgs)), - ('total votes', total_votes) - ) - right_width = len(str(total_votes)) - - for l, r in totals: - left_width = max(left_width, len(l)) - right_width = max(right_width, len(str(r))) - - format_string = ' {{:<{:d}s}}\t{{:{:d}d}}'.format(left_width, right_width) - for pkg in pkgs: - print(format_string.format(pkg['Name'], pkg['NumVotes'])) - - print(' ' + '-' * left_width + '\t' + '-' * right_width) - print(format_string.format('vote total', total_votes)) - print(format_string.format('package total', len(pkgs))) - else: - print("no packages found for {}".format(maintainer)) - print() + maintainers = argv[1:] + if not maintainers: + print( + "usage: {0} [, ...]\n\n e.g. {0} Xyne".format( + argv[0] + ) + ) + exit(1) + + for maintainer in maintainers: + print("{}'s packages, sorted by votes:".format(maintainer)) + aur = AUR.AUR() + pkgs = list(aur.search(maintainer, by="maintainer")) + if pkgs: + pkgs.sort(key=lambda p: p["NumVotes"], reverse=True) + left_width = max([len(p["Name"]) for p in pkgs]) + total_votes = sum(p["NumVotes"] for p in pkgs) + totals = (("packages", len(pkgs)), ("total votes", total_votes)) + right_width = len(str(total_votes)) + + for l, r in totals: + left_width = max(left_width, len(l)) + right_width = max(right_width, len(str(r))) + + format_string = " {{:<{:d}s}}\t{{:{:d}d}}".format( + left_width, right_width + ) + for pkg in pkgs: + print(format_string.format(pkg["Name"], pkg["NumVotes"])) + + print(" " + "-" * left_width + "\t" + "-" * right_width) + print(format_string.format("vote total", total_votes)) + print(format_string.format("package total", len(pkgs))) + else: + print("no packages found for {}".format(maintainer)) + print() if __name__ == "__main__": - main() + main() diff --git a/examples/vote.py b/examples/vote.py index 248aff1..c79f1ca 100755 --- a/examples/vote.py +++ b/examples/vote.py @@ -3,10 +3,10 @@ from sys import argv if argv[1:]: - aurt = Aurtomatic() - aurt.initialize() + aurt = Aurtomatic() + aurt.initialize() - for pkgname in argv[1:]: - pkginfo = aurt.get_info(pkgname) - print("voting for {}".format(pkginfo['Name'])) - aurt.do_package_action(pkginfo, 'vote') + for pkgname in argv[1:]: + pkginfo = aurt.get_info(pkgname) + print("voting for {}".format(pkginfo["Name"])) + aurt.do_package_action(pkginfo, "vote") From 3d913cf744e2845343c3a69792bf8dc7313b6ce5 Mon Sep 17 00:00:00 2001 From: jmcb Date: Tue, 30 May 2023 16:22:11 +0100 Subject: [PATCH 4/7] Normalise file headers No coding header, shebangs standardised and only where appropriate --- AUR/AurPkg.py | 3 --- AUR/Aurtomatic.py | 4 ++-- AUR/PkgList.py | 3 --- AUR/RPC.py | 3 +-- AUR/SRCINFO.py | 3 +-- AUR/__init__.py | 2 -- AUR/common.py | 3 --- aurpkglist | 2 +- aurploader | 2 +- aurquery | 2 +- aurtomatic | 2 +- aurtus | 2 +- examples/comment.py | 3 ++- examples/msearch.py | 2 +- examples/vote.py | 3 ++- 15 files changed, 14 insertions(+), 25 deletions(-) diff --git a/AUR/AurPkg.py b/AUR/AurPkg.py index cfd2a8a..6cc01c2 100644 --- a/AUR/AurPkg.py +++ b/AUR/AurPkg.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - # Copyright (C) 2016-2021 Xyne # # This program is free software; you can redistribute it and/or diff --git a/AUR/Aurtomatic.py b/AUR/Aurtomatic.py index bcf410e..cd2b7e8 100644 --- a/AUR/Aurtomatic.py +++ b/AUR/Aurtomatic.py @@ -1,5 +1,5 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- +#!/usr/bin/env python + # Copyright (C) 2012-2015 Xyne # diff --git a/AUR/PkgList.py b/AUR/PkgList.py index 1255859..ee54348 100644 --- a/AUR/PkgList.py +++ b/AUR/PkgList.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - # Copyright (C) 2012-2015 Xyne # # This program is free software; you can redistribute it and/or diff --git a/AUR/RPC.py b/AUR/RPC.py index a7d2545..a6595fb 100644 --- a/AUR/RPC.py +++ b/AUR/RPC.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- +#!/usr/bin/env python # Copyright (C) 2009-2021 Xyne # diff --git a/AUR/SRCINFO.py b/AUR/SRCINFO.py index 4a7460b..6a6bd30 100644 --- a/AUR/SRCINFO.py +++ b/AUR/SRCINFO.py @@ -1,5 +1,4 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- +#!/usr/bin/env python # Copyright (C) 2015 Xyne # diff --git a/AUR/__init__.py b/AUR/__init__.py index 56fafa5..e69de29 100644 --- a/AUR/__init__.py +++ b/AUR/__init__.py @@ -1,2 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- diff --git a/AUR/common.py b/AUR/common.py index 647ba65..72eac44 100644 --- a/AUR/common.py +++ b/AUR/common.py @@ -1,6 +1,3 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - # Copyright (C) 2012-2015 Xyne # # This program is free software; you can redistribute it and/or diff --git a/aurpkglist b/aurpkglist index fcfd47c..43bfb87 100755 --- a/aurpkglist +++ b/aurpkglist @@ -1,4 +1,4 @@ -#!python +#!/usr/bin/env python import AUR.common import AUR.PkgList as APL diff --git a/aurploader b/aurploader index e6a96a0..f9b3fe9 100755 --- a/aurploader +++ b/aurploader @@ -1,3 +1,3 @@ -#!python +#!/usr/bin/env python from AUR.Aurtomatic import run_main run_main() diff --git a/aurquery b/aurquery index 0edc548..18cab1c 100755 --- a/aurquery +++ b/aurquery @@ -1,3 +1,3 @@ -#!python +#!/usr/bin/env python from AUR.RPC import run_main run_main() diff --git a/aurtomatic b/aurtomatic index 3fec21b..5a02b9f 100755 --- a/aurtomatic +++ b/aurtomatic @@ -1,4 +1,4 @@ -#!python +#!/usr/bin/env python from AUR.Aurtomatic import Aurtomatic, AurtomaticError, CookieWrapper, PACKAGE_ACTIONS, DO_ACTIONS, VALUE_ACTIONS, prompt_comment import argparse import subprocess diff --git a/aurtus b/aurtus index b22b79c..417d6be 100755 --- a/aurtus +++ b/aurtus @@ -1,4 +1,4 @@ -#!python +#!/usr/bin/env python from AUR.Aurtomatic import Aurtomatic, AurtomaticError, CookieWrapper import argparse diff --git a/examples/comment.py b/examples/comment.py index 2c9e4b4..d57b3c8 100755 --- a/examples/comment.py +++ b/examples/comment.py @@ -1,4 +1,5 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python + from AUR.Aurtomatic import Aurtomatic, AurtomaticError, prompt_comment from sys import argv, stderr, exit diff --git a/examples/msearch.py b/examples/msearch.py index 82e4a37..6d2d4d1 100755 --- a/examples/msearch.py +++ b/examples/msearch.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python import AUR.RPC as AUR from sys import argv, exit diff --git a/examples/vote.py b/examples/vote.py index c79f1ca..3b88a9d 100755 --- a/examples/vote.py +++ b/examples/vote.py @@ -1,4 +1,5 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python + from AUR.Aurtomatic import Aurtomatic from sys import argv From b7eed8b20992cb39f2375fd0945667033f43d837 Mon Sep 17 00:00:00 2001 From: jmcb Date: Tue, 30 May 2023 17:04:58 +0100 Subject: [PATCH 5/7] Create README.md --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..4d9e6d4 --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +# python3-aur + +This repo is an **unofficial mirror** of Xyne's Python library for interacting with the AUR (Arch User Repository) + +- Upstream URL: https://xyne.dev/projects/python3-aur/ +- AUR install URL: https://aur.archlinux.org/packages/python3-aur From cdc7d1d230b7d6278d77dfa17e92189e54916aa7 Mon Sep 17 00:00:00 2001 From: jmcb Date: Tue, 30 May 2023 17:09:01 +0100 Subject: [PATCH 6/7] Add dependencies to pyproject.toml --- pyproject.toml | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index afac9a4..fd79151 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,9 +10,17 @@ license = {file = "COPYING"} authors = [ {name = "Xyne", email = "xyne@archlinux.org"}, ] +dependencies = [ + "pyalpm", + "pyxdg", + # https://aur.archlinux.org/packages/python3-xcgf + "xcgf", + # https://aur.archlinux.org/packages/python3-xcpf + "xcpf" +] [project.urls] -Home = "http://xyne.dev/projects/python3-aur" +Home = "https://xyne.dev/projects/python3-aur" [project.scripts] aurpkglist = "aurpkglist" From 5a3e362d0db2987942e7497366c2337a55235fb9 Mon Sep 17 00:00:00 2001 From: jmcb Date: Tue, 30 May 2023 17:51:06 +0100 Subject: [PATCH 7/7] Port README contents from webpage --- COPYING => LICENSE | 0 README.md | 201 +++++++++++++++++++++++++++++++++++++++++++++ pyproject.toml | 3 +- 3 files changed, 203 insertions(+), 1 deletion(-) rename COPYING => LICENSE (100%) diff --git a/COPYING b/LICENSE similarity index 100% rename from COPYING rename to LICENSE diff --git a/README.md b/README.md index 4d9e6d4..26713fd 100644 --- a/README.md +++ b/README.md @@ -4,3 +4,204 @@ This repo is an **unofficial mirror** of Xyne's Python library for interacting w - Upstream URL: https://xyne.dev/projects/python3-aur/ - AUR install URL: https://aur.archlinux.org/packages/python3-aur + +_The following text was taken from the upstream html page and converted to markdown_ + +## About + +This package contains Python 3 modules for interacting with the AUR along with some tools that use them. + +Example scripts are provided in the scripts directory. + +See [paconky.py](https://xyne.dev/scripts/conky/) for an example of how this can be used with pyalpm. +## Command-Line Utilities + +The following are installed with the package. See help messages below for more information. + +**aurtomatic** + +Aurtomatic lets you do the following from the command-line for multiple packages: + +* comment +* vote +* unvote +* notify +* unnotify +* flag +* unflag + +Show support for the AUR packages that you use by running + +`aurtomatic -i -a vote` + +### aurquery + +Aurquery is a caching wrapper around the [AUR’s RPC interface](https://aur.archlinux.org/rpc.php) for querying package information from the command line. Information is returned in a format similar to pacman’s “-Si” output. + +### aurploader + +** Note: uploading is deprecated since the switch to Git repos. ** + +Aurploader is a command-line utility for uploading packages to the AUR. You can do the following when uploading a package: + +* select a category (with automatic detection for existing packages) +* post a comment +* request notifications +* vote + +#### Usage + +`aurcomment [...]` + +### aurpkglist + +Print the list of AUR packages. The script also maintains a locally cached list. + +## Modules + +### AUR.RPC + +Retrieve and cache data from the AUR’s [RPC interface](https://aur.archlinux.org/rpc.php). Results are cached in an SQLite3 database and are refreshed after a configurable interval. + +This was the original AUR module before Aurploader was included. + +### AUR.Aurtomatic + +Interact with the AUR. The following actions are supported: + +* log in +* upload packages +* post comments +* all package actions (vote, unvote, notify, unnotify, change tags, etc.) + +This module was originally part of [aurploader](https://xyne.dev/projects/aurploader/). + +### AUR.PkgList + +Retrieve a full list of AUR packages. The module provides a class for mirroring the remote file and iterating over the package names in the list. The gzipped list is available online here. + +### AUR.SRCINFO + +Parse information in a **.SRCINFO** file. + +## Complementary Modules + +* [pyalpm](https://projects.archlinux.org/users/remy/pyalpm.git/) +* [XCPF](https://xyne.dev/projects/python3-xcpf/) +* [Reflector](https://xyne.dev/projects/reflector/) + +## Aurquery Help Message + +```$ aurquery -h + +usage: aurquery [-h] [-i] [-s] [--by {name,name-desc,maintainer}] [--debug] + [--log ] [--ttl ] [--full-info] [--intersect] + [ ...] + +Query the AUR RPC interface. + +positional arguments: + + +options: + -h, --help show this help message and exit + -i, --info Query package information. + -s, --search Search the AUR. + --by {name,name-desc,maintainer} + By which fields to search. Default: name-desc + --debug Enable debugging. + --log Log debugging information to . + --ttl Time-to-live of cached data (default: 15) + --full-info Return full information for searches and msearches. + --intersect When searching for packages, only return results that + match all search terms. + +For maintainer searches, use an empty string ('') as an argument to search for +orphans. +``` + +## Aurtomatic Help Message Output + +```$ aurtomatic -h + +usage: aurtomatic [-h] + [-a {adopt,comment,delete,disown,flag,notify,setkeywords,unflag,unnotify,unvote,vote} [{adopt,comment,delete,disown,flag,notify,setkeywords,unflag,unnotify,unvote,vote} ...]] + [--comment COMMENT] [--comment-from-file ] + [--keywords [KEYWORD ...]] [-i] [-q] [-c ] + [-j {ask,keep,remove}] [-l ] [--confirm] + [--merge-into MERGE_INTO] + [ ...] + +Post comments to the AUR. + +positional arguments: + + +options: + -h, --help show this help message and exit + -a {adopt,comment,delete,disown,flag,notify,setkeywords,unflag,unnotify,unvote,vote} [{adopt,comment,delete,disown,flag,notify,setkeywords,unflag,unnotify,unvote,vote} ...], --action {adopt,comment,delete,disown,flag,notify,setkeywords,unflag,unnotify,unvote,vote} [{adopt,comment,delete,disown,flag,notify,setkeywords,unflag,unnotify,unvote,vote} ...] + Action(s) to perform for each specified package. + --comment COMMENT Post a comment without the interactive prompt. + --comment-from-file + Load a comment from a path without the interactive + prompt. + --keywords [KEYWORD ...] + Set keywords without the interactive prompt. + -i, --installed Perform action for all installed AUR packages. Use + this to vote for the packages that you use and show + support. + -q, --quiet Suppress output. + +cookie-management arguments: + -c , --cookiejar + Specify the path of the cookie jar. The file follows + the Netscape format. + -j {ask,keep,remove}, --jar {ask,keep,remove} + What to do with the cookiejar. Default: ask. + -l , --login + Read name and password from a file. The first line + should contain the name and the second the password. + +deletion arguments: + --confirm Confirm deletion and other actions requiring + additional confirmation. + --merge-into MERGE_INTO + Merge target when deleting package. +``` +## Aurpkglist Help Message Output + +```$ aurpkglist -h + +usage: aurpkglist [-h] [-f] [-t TTL] [-p PATH] [-q] + +Retrieve and print the full list of AUR packages. + +options: + -h, --help show this help message and exit + -f, --force Force a refresh of the local file. + -t TTL, --ttl TTL, --time TTL + The time, in seconds, to cache the local file, counted + from the last modification. Default: 900 + -p PATH, --path PATH Set the local file path. + -q, --quiet Refresh the local file without printing the list of + packages. +``` +## AurTUs Help Message Output + +```$ aurtus -h + +usage: aurtus [-h] [-c ] [-j {ask,keep,remove}] [-l ] + +Retrieve the current list of trusted users (TUs) from the AUR. + +options: + -h, --help show this help message and exit + -c , --cookiejar + Specify the path of the cookie jar. The file follows + the Netscape format. + -j {ask,keep,remove}, --jar {ask,keep,remove} + What to do with the cookiejar. Default: ask. + -l , --login + Read name and password from a file. The first line + should contain the name and the second the password. +``` diff --git a/pyproject.toml b/pyproject.toml index fd79151..190a43a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,8 @@ build-backend = "setuptools.build_meta" name = "AUR" version = "2021.11.20" description = "AUR-related modules and helper utilities (aurploader, aurquery, aurtomatic)." -license = {file = "COPYING"} +readme = "README.md" +license = {file = "LICENSE"} authors = [ {name = "Xyne", email = "xyne@archlinux.org"}, ]