Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Miscellanous fixups of a miscellaneous nature #171

Closed
wants to merge 7 commits into from
2 changes: 1 addition & 1 deletion src/aurman/bash_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def possible_completions():
return

results = run(
"expac -Ss '%n' ^{}".format(cur), shell=True, stdout=PIPE, stderr=DEVNULL, universal_newlines=True
["expac", "-Ss", "%n", "^{}".format(cur)], stdout=PIPE, stderr=DEVNULL, universal_newlines=True
).stdout.splitlines()

results.extend([ret_dict["Name"] for ret_dict in get_aur_info((cur,), True, True)])
Expand Down
76 changes: 26 additions & 50 deletions src/aurman/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def get_known_repos() -> List[str]:
return [db.name for db in PacmanConfig(conf="/etc/pacman.conf").initialize_alpm().get_syncdbs()]

@staticmethod
def get_packages_from_expac(expac_operation: str, packages_names: Sequence[str], packages_type: PossibleTypes) -> \
def get_packages_from_expac(expac_operation: str, packages_names: List[str], packages_type: PossibleTypes) -> \
List['Package']:
"""
Generates and returns packages from an expac query.
Expand Down Expand Up @@ -895,38 +895,36 @@ def fetch_pkgbuild(self):

# check if repo has ever been fetched
if os.path.isdir(package_dir):
if run("git fetch", shell=True, cwd=package_dir).returncode != 0:
if run(["git", "fetch"], cwd=package_dir).returncode != 0:
logging.error("git fetch of {} failed".format(self.name))
raise ConnectionProblem("git fetch of {} failed".format(self.name))

head = run(
"git rev-parse HEAD", shell=True, stdout=PIPE, universal_newlines=True, cwd=package_dir
["git", "rev-parse", "HEAD"], stdout=PIPE, universal_newlines=True, cwd=package_dir
).stdout.strip()
u = run(
"git rev-parse @{u}", shell=True, stdout=PIPE, universal_newlines=True, cwd=package_dir
["git", "rev-parse", "@{u}"], stdout=PIPE, universal_newlines=True, cwd=package_dir
).stdout.strip()

# if new sources available
if head != u:
if run(
"git reset --hard HEAD && git pull", shell=True, stdout=DEVNULL, stderr=DEVNULL, cwd=package_dir
).returncode != 0:
run(["git", "reset", "-q", "--hard", "HEAD"], stderr=DEVNULL cwd=package_dir)
if run(["git", "pull", "-q"], stderr=DEVNULL, cwd=package_dir).returncode != 0:
logging.error("sources of {} could not be fetched".format(self.name))
raise ConnectionProblem("sources of {} could not be fetched".format(self.name))

# repo has never been fetched
else:
# create package dir
if run(
"install -dm700 '{}'".format(package_dir), shell=True, stdout=DEVNULL, stderr=DEVNULL
).returncode != 0:
try:
os.makedirs(package_dir, mode=0o700, exist_ok=True)
except:
logging.error("Creating package dir of {} failed".format(self.name))
raise InvalidInput("Creating package dir of {} failed".format(self.name))

# clone repo
if run(
"git clone {}/{}.git".format(aurman.aur_utilities.aur_domain, self.pkgbase),
shell=True,
["git", "clone", "{}/{}.git".format(aurman.aur_utilities.aur_domain, self.pkgbase)],
cwd=Package.cache_dir
).returncode != 0:
logging.error("Cloning repo of {} failed".format(self.name))
Expand All @@ -949,26 +947,20 @@ def search_and_fetch_pgp_keys(self, fetch_always: bool = False, keyserver: str =
pgp_keys = Package.getPGPKeys(os.path.join(package_dir, "PKGBUILD"))

for pgp_key in pgp_keys:
is_key_known = run(
"gpg --list-public-keys {}".format(pgp_key), shell=True, stdout=DEVNULL, stderr=DEVNULL
).returncode == 0
if not is_key_known:
if run(["gpg", "--list-keys", pgp_key], stdout=DEVNULL, stderr=DEVNULL).returncode != 0:
if fetch_always or ask_user(
"PGP Key {} found in PKGBUILD of {} and is not known yet. "
"Do you want to import the key?".format(Colors.BOLD(Colors.LIGHT_MAGENTA(pgp_key)),
Colors.BOLD(Colors.LIGHT_MAGENTA(self.name))), True):
if keyserver is None:
if run("gpg --recv-keys {}".format(pgp_key), shell=True).returncode != 0:
if run(["gpg", "--recv-keys", pgp_key]).returncode != 0:
logging.error("Import PGP key {} failed.".format(pgp_key))
raise ConnectionProblem(
"Import PGP key {} failed. "
"This is an error in your GnuPG installation.".format(pgp_key)
)
else:
if run(
"gpg --keyserver {} --recv-keys {}".format(keyserver, pgp_key),
shell=True
).returncode != 0:
if run(["gpg", "--keyserver", keyserver, "--recv-keys", pgp_key]).returncode != 0:
logging.error("Import PGP key {} from {} failed.".format(pgp_key, keyserver))
raise ConnectionProblem(
"Import PGP key {} from {} failed. "
Expand Down Expand Up @@ -1000,28 +992,19 @@ def show_pkgbuild(self, noedit: bool = False, show_changes: bool = False,
raise InvalidInput("Package dir of {} does not exist".format(self.name))

# if aurman dir does not exist - create
if not os.path.isdir(git_aurman_dir):
if run(
"install -dm700 '{}'".format(git_aurman_dir), shell=True, stdout=DEVNULL, stderr=DEVNULL
).returncode != 0:
logging.error("Creating git_aurman_dir of {} failed".format(self.name))
raise InvalidInput("Creating git_aurman_dir of {} failed".format(self.name))
try:
os.makedirs(git_aurman_dir, mode=0o700, exist_ok=True)
except:
logging.error("Creating git_aurman_dir of {} failed".format(self.name))
raise InvalidInput("Creating git_aurman_dir of {} failed".format(self.name))

# if last commit seen hash file does not exist - create
if not os.path.isfile(last_commit_hash_file):
empty_tree_hash = run(
"git hash-object -t tree --stdin < /dev/null",
shell=True,
stdout=PIPE,
stderr=DEVNULL,
universal_newlines=True
).stdout.strip()

with open(last_commit_hash_file, 'w') as f:
f.write(empty_tree_hash)
run(["git", "hash-object", "-t", "tree", "/dev/null"], stdout=f, stderr=DEVNULL)

current_commit_hash = run(
"git rev-parse HEAD", shell=True, stdout=PIPE, stderr=DEVNULL, cwd=package_dir, universal_newlines=True
["git", "rev-parse", "HEAD"], stdout=PIPE, stderr=DEVNULL, cwd=package_dir, universal_newlines=True
).stdout.strip()

# if files have been reviewed
Expand All @@ -1035,7 +1018,7 @@ def show_pkgbuild(self, noedit: bool = False, show_changes: bool = False,
# relevant files are all files besides .SRCINFO
relevant_files = []
files_in_pack_dir = run(
"git ls-files", shell=True, stdout=PIPE, stderr=DEVNULL, universal_newlines=True, cwd=package_dir
["git", "ls-files"], stdout=PIPE, stderr=DEVNULL, universal_newlines=True, cwd=package_dir
).stdout.strip().splitlines()
for file in files_in_pack_dir:
if file != ".SRCINFO":
Expand All @@ -1050,11 +1033,7 @@ def show_pkgbuild(self, noedit: bool = False, show_changes: bool = False,
"".format(Colors.BOLD(Colors.LIGHT_MAGENTA(self.name))),
default_show_changes):

run(
"git diff {} {} -- . ':(exclude).SRCINFO'".format(
last_seen_hash, current_commit_hash
),
shell=True,
run(["git", "diff", last_seen_hash, current_commit_hash, "--", ".", ":(exclude).SRCINFO"],
cwd=package_dir
)
any_changes_seen = True
Expand Down Expand Up @@ -1087,10 +1066,7 @@ def show_pkgbuild(self, noedit: bool = False, show_changes: bool = False,
else:
file = relevant_files[user_input - 1]
if run(
"{} {}".format(
Package.default_editor_path, os.path.join(package_dir, file)
),
shell=True
[Package.default_editor_path, os.path.join(package_dir, file)]
).returncode != 0:
logging.error("Editing {} of {} failed".format(file, self.name))
raise InvalidInput("Editing {} of {} failed".format(file, self.name))
Expand Down Expand Up @@ -1269,13 +1245,13 @@ def get_installed_packages() -> List['Package']:

:return: A list containing the installed packages
"""
repo_packages_names = set(expac("-S", ('n',), ()))
repo_packages_names = set(expac("-S", ['n'], []))

# packages the user wants to install from aur
aur_names = packages_from_other_sources()[0]
repo_packages_names -= aur_names

installed_packages_names = set(expac("-Q", ('n',), ()))
installed_packages_names = set(expac("-Q", ['n'], []))
installed_repo_packages_names = installed_packages_names & repo_packages_names
unclassified_installed_names = installed_packages_names - installed_repo_packages_names

Expand Down Expand Up @@ -1323,7 +1299,7 @@ def get_repo_packages() -> List['Package']:

:return: A list containing the current repo packages
"""
return Package.get_packages_from_expac("-S", (), PossibleTypes.REPO_PACKAGE)
return Package.get_packages_from_expac("-S", [], PossibleTypes.REPO_PACKAGE)

def __init__(self, packages: Sequence['Package']):
self.all_packages_dict = {} # names as keys and packages as values
Expand Down
39 changes: 15 additions & 24 deletions src/aurman/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
import shutil
import sys
from copy import deepcopy
from subprocess import run, DEVNULL
Expand All @@ -20,6 +21,11 @@

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(module)s - %(funcName)s - %(levelname)s - %(message)s')

def rmtree_onerror(*args):
aurman_error(
"Directory {} could not be deleted".format(Colors.BOLD(Colors.LIGHT_MAGENTA(path)))
)
sys.exit(1)

def readconfig() -> None:
"""
Expand Down Expand Up @@ -71,9 +77,9 @@ def show_version() -> None:
"""
# remove colors in case of not terminal
if stdout.isatty():
aurman_note(expac("-Q", ("v",), ("aurman-git", "aurman"))[0])
aurman_note(expac("-Q", ["v"], ["aurman-git", "aurman"])[0])
else:
print(expac("-Q", ("v",), ("aurman-git", "aurman"))[0])
print(expac("-Q", ["v"], ["aurman-git", "aurman"])[0])
sys.exit(0)


Expand All @@ -84,12 +90,12 @@ def redirect_pacman(pacman_args: 'PacmanArgs', args: List[str]) -> None:
:param args: the args unparsed
"""
try:
cmd = ["pacman"]
if pacman_args.operation in [
PacmanOperations.UPGRADE, PacmanOperations.REMOVE, PacmanOperations.DATABASE, PacmanOperations.FILES
]:
run("sudo pacman {}".format(" ".join(["'{}'".format(arg) for arg in args])), shell=True)
else:
run("pacman {}".format(" ".join(["'{}'".format(arg) for arg in args])), shell=True)
cmd = ["sudo", "pacman"]
run(cmd + args)
except InvalidInput:
sys.exit(1)

Expand Down Expand Up @@ -152,13 +158,7 @@ def clean_cache(pacman_args: 'PacmanArgs', aur: bool, repo: bool, clean_force: b
False
):
aurman_status("Deleting cache dir...")
if run(
"rm -rf {}".format(Package.cache_dir), shell=True, stdout=DEVNULL, stderr=DEVNULL
).returncode != 0:
aurman_error(
"Directory {} could not be deleted".format(Colors.BOLD(Colors.LIGHT_MAGENTA(Package.cache_dir)))
)
sys.exit(1)
shutil.rmtree(Package.cache_dir, onerror=rmtree_onerror)
else:
if noconfirm or \
ask_user(
Expand All @@ -170,7 +170,7 @@ def clean_cache(pacman_args: 'PacmanArgs', aur: bool, repo: bool, clean_force: b
aurman_status("Deleting uninstalled clones from cache...")

# if pkgbase not available, the name of the package is the base
expac_returns = expac("-Q -1", ("e", "n"), ())
expac_returns = expac("-Q1", ["e", "n"], [])
dirs_to_not_delete = set()
for expac_return in expac_returns:
pkgbase = expac_return.split("?!")[0]
Expand All @@ -182,16 +182,7 @@ def clean_cache(pacman_args: 'PacmanArgs', aur: bool, repo: bool, clean_force: b
for thing in os.listdir(Package.cache_dir):
if os.path.isdir(os.path.join(Package.cache_dir, thing)):
if thing not in dirs_to_not_delete:
dir_to_delete = os.path.join(Package.cache_dir, thing)
if run(
"rm -rf {}".format(dir_to_delete), shell=True, stdout=DEVNULL, stderr=DEVNULL
).returncode != 0:
aurman_error(
"Directory {} could not be deleted".format(
Colors.BOLD(Colors.LIGHT_MAGENTA(dir_to_delete))
)
)
sys.exit(1)
shutil.rmtree(os.path.join(Package.cache_dir, thing))

if not noconfirm and \
ask_user(
Expand All @@ -206,7 +197,7 @@ def clean_cache(pacman_args: 'PacmanArgs', aur: bool, repo: bool, clean_force: b
if os.path.isdir(os.path.join(Package.cache_dir, thing)):
dir_to_clean = os.path.join(Package.cache_dir, thing)
if run(
"git clean -ffdx", shell=True, stdout=DEVNULL, stderr=DEVNULL, cwd=dir_to_clean
["git", "clean", "-qffdx"], stderr=DEVNULL, cwd=dir_to_clean
).returncode != 0:
aurman_error(
"Directory {} could not be cleaned".format(
Expand Down
10 changes: 5 additions & 5 deletions src/aurman/parsing_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import configparser
import logging
import os
from subprocess import run, DEVNULL
from typing import Tuple, Set, Dict

from aurman.coloring import aurman_error, Colors
Expand All @@ -26,10 +25,11 @@ def read_config() -> 'configparser.ConfigParser':
config_file = os.path.join(config_dir, "aurman_config")

# create config dir if it does not exist
if not os.path.exists(config_dir):
if run("install -dm700 '{}'".format(config_dir), shell=True, stdout=DEVNULL, stderr=DEVNULL).returncode != 0:
logging.error("Creating config dir of aurman failed")
raise InvalidInput("Creating config dir of aurman failed")
try:
os.makedirs(config_dir, mode=0o700, exist_ok=True)
except:
logging.error("Creating config dir of aurman failed")
raise InvalidInput("Creating config dir of aurman failed")

# create empty config if config does not exist
if not os.path.isfile(config_file):
Expand Down
12 changes: 4 additions & 8 deletions src/aurman/utilities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import shlex
import sys
import termios
import threading
Expand Down Expand Up @@ -34,12 +35,7 @@ def search_and_print(names: Sequence[str], installed_system, pacman_params: str,
return

if not aur:
# escape for pacman
to_escape = list("()+?|{}")
for char in to_escape:
pacman_params = pacman_params.replace(char, "\{}".format(char))

run("pacman {}".format(pacman_params), shell=True)
run(["pacman"] + shlex.split(pacman_params))

if not repo:
# see: https://docs.python.org/3/howto/regex.html
Expand Down Expand Up @@ -179,11 +175,11 @@ def acquire_sudo():

def sudo_loop():
while True:
if run("sudo --non-interactive -v", shell=True, stdout=DEVNULL).returncode != 0:
if run(["sudo", "--non-interactive", "-v"]).returncode != 0:
logging.error("acquire sudo failed")
time.sleep(SudoLoop.timeout)

if run("sudo -v", shell=True).returncode != 0:
if run(["sudo", "-v"]).returncode != 0:
logging.error("acquire sudo failed")
raise InvalidInput("acquire sudo failed")
t = threading.Thread(target=sudo_loop)
Expand Down
Loading