Permalink
Cannot retrieve contributors at this time
Fetching contributors…
| #!/usr/bin/python3 | |
| # Copyright (c) 2005-2015 Canonical Ltd | |
| # | |
| # AUTHOR: | |
| # Michael Vogt <mvo@ubuntu.com> | |
| # | |
| # This file is part of unattended-upgrades | |
| # | |
| # unattended-upgrades is free software; you can redistribute it and/or | |
| # modify it under the terms of the GNU General Public License as published | |
| # by the Free Software Foundation; either version 2 of the License, or (at | |
| # your option) any later version. | |
| # | |
| # unattended-upgrades is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
| # General Public License for more details. | |
| # | |
| # You should have received a copy of the GNU General Public License | |
| # along with unattended-upgrades; if not, write to the Free Software | |
| # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA | |
| # | |
| import atexit | |
| import copy | |
| import datetime | |
| import errno | |
| import email.charset | |
| import fcntl | |
| import fnmatch | |
| import gettext | |
| import grp | |
| import io | |
| import locale | |
| import logging | |
| import logging.handlers | |
| import re | |
| import os | |
| import select | |
| import signal | |
| import string | |
| import subprocess | |
| import sys | |
| try: | |
| from typing import AbstractSet, Dict, Iterable, List, Tuple | |
| AbstractSet # pyflakes | |
| Dict # pyflakes | |
| Iterable # pyflakes | |
| List # pyflakes | |
| Tuple # pyflakes | |
| except ImportError: | |
| pass | |
| from datetime import date | |
| from email.message import Message | |
| from gettext import gettext as _ | |
| from io import StringIO | |
| from optparse import ( | |
| OptionParser, | |
| SUPPRESS_HELP, | |
| ) | |
| from subprocess import ( | |
| Popen, | |
| PIPE, | |
| ) | |
| import apt | |
| import apt_inst | |
| import apt_pkg | |
| # the reboot required flag file used by packages | |
| REBOOT_REQUIRED_FILE = "/var/run/reboot-required" | |
| MAIL_BINARY = "/usr/bin/mail" | |
| SENDMAIL_BINARY = "/usr/sbin/sendmail" | |
| USERS = "/usr/bin/users" | |
| # no py3 lsb_release in debian :/ | |
| DISTRO_CODENAME = subprocess.check_output( | |
| ["lsb_release", "-c", "-s"], universal_newlines=True).strip() # type: str | |
| DISTRO_ID = subprocess.check_output( | |
| ["lsb_release", "-i", "-s"], universal_newlines=True).strip() # type: str | |
| # progress information is written here | |
| PROGRESS_LOG = "/var/run/unattended-upgrades.progress" | |
| PID_FILE = "/var/run/unattended-upgrades.pid" | |
| LOCK_FILE = "/var/run/unattended-upgrades.lock" | |
| # set from the sigint signal handler | |
| SIGNAL_STOP_REQUEST = False | |
| class LoggingDateTime: | |
| """The date/time representation for the dpkg log file timestamps""" | |
| LOG_DATE_TIME_FMT = "%Y-%m-%d %H:%M:%S" | |
| @classmethod | |
| def as_string(cls): | |
| # type: () -> str | |
| """Return the current date and time as LOG_DATE_TIME_FMT string""" | |
| return datetime.datetime.now().strftime(cls.LOG_DATE_TIME_FMT) | |
| @classmethod | |
| def from_string(cls, logstr): | |
| # type: (str) -> datetime.datetime | |
| """Take a LOG_DATE_TIME_FMT string and return datetime object""" | |
| return datetime.datetime.strptime(logstr, cls.LOG_DATE_TIME_FMT) | |
| class UnknownMatcherError(ValueError): | |
| pass | |
| class UnattendedUpgradesCache(apt.Cache): | |
| def __init__(self, rootdir, allowed_origins): | |
| # type: (str, List[str]) -> None | |
| apt.Cache.__init__(self, rootdir=rootdir) | |
| self.allowed_origins = allowed_origins | |
| # ensure we update the candidate versions | |
| self.candidates_to_adjust = self.get_candidates_to_adjust() | |
| for pkgname, candidate in self.candidates_to_adjust.items(): | |
| self[pkgname].candidate = candidate | |
| def clear(self): | |
| # type: () -> None | |
| apt.Cache.clear(self) | |
| # ensure we update the candidate versions | |
| for pkgname, candidate in self.candidates_to_adjust.items(): | |
| self[pkgname].candidate = candidate | |
| def get_candidates_to_adjust(self): | |
| # type: () -> Dict[str, apt.Version] | |
| """ Get candidate versions needed to be adjusted to match highest | |
| allowed origin | |
| This lets adjusting the origin even if the candidate has a higher | |
| version. This is needed when e.g. a package is available in | |
| the security pocket but there is also a package in the | |
| updates pocket with a higher version number | |
| """ | |
| candidates = {} | |
| for pkg in self: | |
| # important! this avoids downgrades below | |
| if not pkg.is_upgradable: | |
| continue | |
| # check if the candidate is already pointing to a allowed | |
| # origin and if so, do not mess with it | |
| if is_allowed_origin(pkg.candidate, self.allowed_origins): | |
| continue | |
| # check if we have a version in a allowed origin that is | |
| # not the candidate | |
| new_cand = None | |
| for ver in pkg.versions: | |
| # ignore versions that the user marked with priority < 100 | |
| # (and ensure we have a python-apt that supports this) | |
| if (hasattr(ver, "policy_priority") and | |
| ver.policy_priority < 100): | |
| logging.debug("ignoring ver '%s' with priority < 0" % ver) | |
| continue | |
| if is_allowed_origin(ver, self.allowed_origins): | |
| # leave as soon as we have the highest new candidate | |
| new_cand = ver | |
| break | |
| if new_cand and new_cand != pkg.candidate: | |
| logging.debug("adjusting candidate version: '%s'" % new_cand) | |
| candidates[pkg.name] = new_cand | |
| return candidates | |
| class LogInstallProgress(apt.progress.base.InstallProgress): | |
| """ Install progress that writes to self.progress_log | |
| (/var/run/unattended-upgrades.progress by default) | |
| """ | |
| def __init__(self, logfile_dpkg, verbose=False, | |
| progress_log="var/run/unattended-upgrades.progress"): | |
| # type: (str, bool, str) -> None | |
| apt.progress.base.InstallProgress.__init__(self) | |
| self.logfile_dpkg = logfile_dpkg | |
| self.progress_log = os.path.join(apt_pkg.config.find_dir("Dir"), | |
| progress_log) | |
| self.verbose = verbose | |
| self.output_logfd = None # type: int | |
| def status_change(self, pkg, percent, status): | |
| # type: (str, float, str) -> None | |
| with open(self.progress_log, "w") as f: | |
| f.write(_("Progress: %s %% (%s)") % (percent, pkg)) | |
| def _fixup_fds(self): | |
| # () -> None | |
| required_fds = [0, 1, 2, # stdin, stdout, stderr | |
| self.writefd, | |
| self.write_stream.fileno(), | |
| self.statusfd, | |
| self.status_stream.fileno() | |
| ] | |
| # ensure that our required fds close on exec | |
| for fd in required_fds[3:]: | |
| old_flags = fcntl.fcntl(fd, fcntl.F_GETFD) | |
| fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC) | |
| # close all fds | |
| proc_fd = "/proc/self/fd" | |
| if os.path.exists(proc_fd): | |
| error_count = 0 | |
| for fdname in os.listdir(proc_fd): | |
| try: | |
| fd = int(fdname) | |
| except Exception as e: | |
| print("ERROR: can not get fd for '%s'" % fdname) | |
| if fd in required_fds: | |
| continue | |
| try: | |
| os.close(fd) | |
| # print("closed: ", fd) | |
| except OSError as e: | |
| # there will be one fd that can not be closed | |
| # as its the fd from pythons internal diropen() | |
| # so its ok to ignore one close error | |
| error_count += 1 | |
| if error_count > 1: | |
| print("ERROR: os.close(%s): %s" % (fd, e)) | |
| def _redirect_stdin(self): | |
| # type: () -> None | |
| REDIRECT_INPUT = os.devnull | |
| fd = os.open(REDIRECT_INPUT, os.O_RDWR) | |
| os.dup2(fd, 0) | |
| def _redirect_output(self): | |
| # type: () -> None | |
| # do not create log in dry-run mode, just output to stdout/stderr | |
| if not apt_pkg.config.find_b("Debug::pkgDPkgPM", False): | |
| logfd = self._get_logfile_dpkg_fd() | |
| os.dup2(logfd, 1) | |
| os.dup2(logfd, 2) | |
| def _get_logfile_dpkg_fd(self): | |
| # type: () -> int | |
| logfd = os.open( | |
| self.logfile_dpkg, os.O_RDWR | os.O_APPEND | os.O_CREAT, 0o640) | |
| try: | |
| adm_gid = grp.getgrnam("adm").gr_gid | |
| os.fchown(logfd, 0, adm_gid) | |
| except (KeyError, OSError): | |
| pass | |
| return logfd | |
| def update_interface(self): | |
| # type: () -> None | |
| # call super class first | |
| apt.progress.base.InstallProgress.update_interface(self) | |
| self._do_verbose_output_if_needed() | |
| def _do_verbose_output_if_needed(self): | |
| # type: () -> None | |
| # if we are in debug mode, nothing to be more verbose about | |
| if apt_pkg.config.find_b("Debug::pkgDPkgPM", False): | |
| return | |
| # handle verbose | |
| if self.verbose: | |
| if self.output_logfd is None: | |
| self.output_logfd = os.open(self.logfile_dpkg, os.O_RDONLY) | |
| os.lseek(self.output_logfd, 0, os.SEEK_END) | |
| try: | |
| select.select([self.output_logfd], [], [], 0) | |
| # FIXME: this should be OSError, but in py2.7 it is still | |
| # select.error | |
| except select.error as e: | |
| if e.errno != errno.EINTR: # type: ignore | |
| logging.exception("select failed") | |
| # output to stdout in verbose mode only | |
| os.write(1, os.read(self.output_logfd, 1024)) | |
| def _log_in_dpkg_log(self, msg): | |
| # type: (str) -> None | |
| logfd = self._get_logfile_dpkg_fd() | |
| os.write(logfd, msg.encode("utf-8")) | |
| os.close(logfd) | |
| def finish_update(self): | |
| # type: () -> None | |
| self._log_in_dpkg_log("Log ended: %s\n\n" | |
| % LoggingDateTime.as_string()) | |
| def fork(self): | |
| # type: () -> int | |
| self._log_in_dpkg_log("Log started: %s\n" | |
| % LoggingDateTime.as_string()) | |
| pid = os.fork() | |
| if pid == 0: | |
| self._fixup_fds() | |
| self._redirect_stdin() | |
| self._redirect_output() | |
| return pid | |
| class Unlocked: | |
| """ | |
| Context manager for unlocking the apt lock while cache.commit() is run | |
| """ | |
| def __enter__(self): | |
| # type: () -> None | |
| try: | |
| apt_pkg.pkgsystem_unlock() | |
| except Exception: | |
| pass | |
| def __exit__(self, exc_type, exc_value, exc_tb): | |
| # type: (object, object, object) -> None | |
| try: | |
| apt_pkg.pkgsystem_unlock() | |
| except Exception: | |
| pass | |
| def is_dpkg_journal_dirty(): | |
| # type: () -> bool | |
| """ | |
| Return True if the dpkg journal is dirty | |
| (similar to debSystem::CheckUpdates) | |
| """ | |
| d = os.path.join( | |
| os.path.dirname(apt_pkg.config.find_file("Dir::State::status")), | |
| "updates") | |
| for f in os.listdir(d): | |
| if re.match("[0-9]+", f): | |
| return True | |
| return False | |
| def signal_handler(signal, frame): | |
| # type: (int, object) -> None | |
| logging.warning("SIGTERM received, will stop") | |
| global SIGNAL_STOP_REQUEST | |
| SIGNAL_STOP_REQUEST = True | |
| def substitute(line): | |
| # type: (str) -> str | |
| """ substitude known mappings and return a new string | |
| Currently supported ${distro-release} | |
| """ | |
| mapping = {"distro_codename": get_distro_codename(), | |
| "distro_id": get_distro_id()} | |
| return string.Template(line).substitute(mapping) | |
| def get_distro_codename(): | |
| # type: () -> str | |
| return DISTRO_CODENAME | |
| def get_distro_id(): | |
| # type: () -> str | |
| return DISTRO_ID | |
| def get_allowed_origins_legacy(): | |
| # type: () -> List[str] | |
| """ legacy support for old Allowed-Origins var """ | |
| allowed_origins = [] # type: List[str] | |
| key = "Unattended-Upgrade::Allowed-Origins" | |
| try: | |
| for s in apt_pkg.config.value_list(key): | |
| # if there is a ":" use that as seperator, else use spaces | |
| if re.findall(r'(?<!\\):', s): | |
| (distro_id, distro_codename) = re.split(r'(?<!\\):', s) | |
| else: | |
| (distro_id, distro_codename) = s.split() | |
| # unescape "\:" back to ":" | |
| distro_id = re.sub(r'\\:', ':', distro_id) | |
| # escape "," (see LP: #824856) - can this be simpler? | |
| distro_id = re.sub(r'([^\\]),', r'\1\\,', distro_id) | |
| distro_codename = re.sub(r'([^\\]),', r'\1\\,', distro_codename) | |
| # convert to new format | |
| allowed_origins.append("o=%s,a=%s" % (substitute(distro_id), | |
| substitute(distro_codename))) | |
| except ValueError: | |
| logging.error(_("Unable to parse %s." % key)) | |
| raise | |
| return allowed_origins | |
| def get_allowed_origins(): | |
| # type: () -> List[str] | |
| """ return a list of allowed origins from apt.conf | |
| This will take substitutions (like distro_id) into account. | |
| """ | |
| allowed_origins = get_allowed_origins_legacy() | |
| key = "Unattended-Upgrade::Origins-Pattern" | |
| try: | |
| for s in apt_pkg.config.value_list(key): | |
| allowed_origins.append(substitute(s)) | |
| except ValueError: | |
| logging.error(_("Unable to parse %s." % key)) | |
| raise | |
| return allowed_origins | |
| def match_whitelist_string(whitelist, origin): | |
| # type: (str, apt.package.Origin) -> bool | |
| """ | |
| take a whitelist string in the form "origin=Debian,label=Debian-Security" | |
| and match against the given python-apt origin. A empty whitelist string | |
| never matches anything. | |
| """ | |
| whitelist = whitelist.strip() | |
| if whitelist == "": | |
| logging.warning("empty match string matches nothing") | |
| return False | |
| res = True | |
| # make "\," the html quote equivalent | |
| whitelist = whitelist.replace("\,", "%2C") | |
| for token in whitelist.split(","): | |
| # strip and unquote the "," back | |
| (what, value) = [s.strip().replace("%2C", ",") | |
| for s in token.split("=")] | |
| # logging.debug("matching '%s'='%s' against '%s'" % ( | |
| # what, value, origin)) | |
| # support substitution here as well | |
| value = substitute(value) | |
| # first char is apt-cache policy output, send is the name | |
| # in the Release file | |
| if what in ("o", "origin"): | |
| match = fnmatch.fnmatch(origin.origin, value) | |
| elif what in ("l", "label"): | |
| match = fnmatch.fnmatch(origin.label, value) | |
| elif what in ("a", "suite", "archive"): | |
| match = fnmatch.fnmatch(origin.archive, value) | |
| elif what in ("c", "component"): | |
| match = fnmatch.fnmatch(origin.component, value) | |
| elif what in ("site",): | |
| match = fnmatch.fnmatch(origin.site, value) | |
| elif what in ("n", "codename",): | |
| match = fnmatch.fnmatch(origin.codename, value) | |
| else: | |
| raise UnknownMatcherError( | |
| "Unknown whitelist entry for matcher '%s' (token '%s')" % ( | |
| what, token)) | |
| # update res | |
| res = res and match | |
| # logging.debug("matching '%s'='%s' against '%s'" % ( | |
| # what, value, origin)) | |
| return res | |
| def cache_commit(cache, # type: apt.Cache | |
| logfile_dpkg, # type: str | |
| verbose, # type: bool | |
| iprogress=None, # type: apt.progress.base.InstallProgress | |
| ): | |
| # type: (...) -> Tuple[bool, Exception] | |
| """Commit the changes from the given cache to the system""" | |
| # set debconf to NON_INTERACTIVE, redirect output | |
| os.putenv("DEBIAN_FRONTEND", "noninteractive") | |
| # only act if there is anything to do (important to avoid writing | |
| # empty log stanzas) | |
| if len(cache.get_changes()) == 0: | |
| return True, None | |
| error = None | |
| res = False | |
| if iprogress is None: | |
| iprogress = LogInstallProgress(logfile_dpkg, verbose) | |
| with Unlocked(): | |
| try: | |
| res = cache.commit(install_progress=iprogress) | |
| except SystemError as e: | |
| error = e | |
| if verbose: | |
| logging.exception("Exception happened during upgrade.") | |
| return res, error | |
| def upgrade_normal(cache, logfile_dpkg, verbose): | |
| # type: (apt.Cache, str, bool) -> bool | |
| res, error = cache_commit(cache, logfile_dpkg, verbose) | |
| if res: | |
| logging.info(_("All upgrades installed")) | |
| else: | |
| logging.error(_("Installing the upgrades failed!")) | |
| logging.error(_("error message: '%s'"), error) | |
| logging.error(_("dpkg returned a error! See '%s' for details"), | |
| logfile_dpkg) | |
| return res | |
| def upgrade_in_minimal_steps(cache, # type: apt.Cache | |
| pkgs_to_upgrade, # type: List[str] | |
| blacklist, # type: List[str] | |
| whitelist, # type: List[str] | |
| logfile_dpkg="", # type: str | |
| verbose=False, # type: bool | |
| ): | |
| # type: (...) -> bool | |
| install_log = LogInstallProgress(logfile_dpkg, verbose) | |
| # double check any changes we do | |
| allowed_origins = get_allowed_origins() | |
| # pre-calculate set sizes to process sets which are expected to be smaller | |
| # earlier | |
| upgrade_set_sizes = {} | |
| # calculate upgrade sets | |
| for pkgname in pkgs_to_upgrade: | |
| cache.clear() | |
| pkg = cache[pkgname] | |
| if pkg.is_upgradable: | |
| pkg.mark_upgrade() | |
| elif not pkg.is_installed: | |
| pkg.mark_install() | |
| else: | |
| continue | |
| upgrade_set_sizes[pkgname] = len(cache.get_changes()) | |
| cache.clear() | |
| # to upgrade contains the package names | |
| to_upgrade = set(pkgs_to_upgrade) | |
| while True: | |
| # find smallest set | |
| # | |
| # u-u could just skip generating the true smallest sets and upgrade the | |
| # packages in the order of increasing expected upgrade set size, but | |
| # that could generate slightly bigger sets and a broken package could | |
| # leave the system with a bigger broken set | |
| smallest_partition = [] # type: List[str] | |
| for pkgname in sorted(upgrade_set_sizes, key=upgrade_set_sizes.get): | |
| if pkgname not in to_upgrade: | |
| # pkg is upgraded in a previous set | |
| continue | |
| if SIGNAL_STOP_REQUEST: | |
| logging.warning("SIGNAL received, stopping") | |
| return False | |
| pkg = cache[pkgname] | |
| if pkg.is_upgradable: | |
| pkg.mark_upgrade(from_user=not pkg.is_auto_installed) | |
| elif not pkg.is_installed: | |
| pkg.mark_install(from_user=False) | |
| else: | |
| continue | |
| # double check that we are not running into side effects like | |
| # what could have been caused LP: #1020680 | |
| if not check_changes_for_sanity(cache, allowed_origins, blacklist, | |
| whitelist): | |
| logging.info("While building minimal partition: " | |
| "cache has not allowed changes") | |
| cache.clear() | |
| continue | |
| changes = [p.name for p in cache.get_changes()] | |
| if not changes: | |
| continue | |
| if len(changes) == 1: | |
| logging.debug("found leaf package %s" % pkg.name) | |
| smallest_partition = changes | |
| break | |
| if (len(changes) < len(smallest_partition) or | |
| len(smallest_partition) == 0): | |
| logging.debug("found partition of size %s (%s)" % ( | |
| len(changes), changes)) | |
| smallest_partition = changes | |
| cache.clear() | |
| if len(smallest_partition) == 0: | |
| logging.info( | |
| _("Some packages can not be upgraded: %s") % (to_upgrade)) | |
| break | |
| # write progress log information | |
| if len(pkgs_to_upgrade) > 0: | |
| percent = ((len(pkgs_to_upgrade) - len(to_upgrade)) / | |
| float(len(pkgs_to_upgrade)) * 100.0) | |
| else: | |
| percent = 100.0 | |
| install_log.status_change(pkg=",".join(smallest_partition), | |
| percent=percent, | |
| status="") | |
| # apply changes | |
| logging.debug("applying set %s" % smallest_partition) | |
| rewind_cache(cache, [cache[name] for name in smallest_partition]) | |
| # one last check, just to be sure | |
| if not check_changes_for_sanity(cache, allowed_origins, blacklist, | |
| whitelist): | |
| logging.error("Failed sanity check while attempting to commit: " | |
| "this should never happen") | |
| cache.clear() | |
| return False | |
| res, error = cache_commit(cache, logfile_dpkg, verbose, install_log) | |
| if error: | |
| if verbose: | |
| logging.exception("Exception happened during upgrade.") | |
| logging.error(_("Installing the upgrades failed!")) | |
| logging.error(_("error message: '%s'"), error) | |
| logging.error(_("dpkg returned a error! See '%s' for details"), | |
| logfile_dpkg) | |
| return False | |
| to_upgrade = to_upgrade - set(smallest_partition) | |
| logging.debug("left to upgrade %s" % to_upgrade) | |
| if len(to_upgrade) == 0: | |
| logging.info(_("All upgrades installed")) | |
| break | |
| return True | |
| def is_allowed_origin(ver, allowed_origins): | |
| # type: (apt.package.Version, List[str]) -> bool | |
| if not ver: | |
| return False | |
| for origin in ver.origins: | |
| for allowed in allowed_origins: | |
| if match_whitelist_string(allowed, origin): | |
| return True | |
| return False | |
| def is_pkgname_in_blacklist(pkgname, blacklist, pkgs_kept_back): | |
| # type: (str, List[str], List[str]) -> bool | |
| for blacklist_regexp in blacklist: | |
| if re.match(blacklist_regexp, pkgname): | |
| logging.debug("skipping blacklisted package '%s'" % pkgname) | |
| pkgs_kept_back.append(pkgname) | |
| return True | |
| return False | |
| def is_pkgname_in_whitelist(pkgname, whitelist): | |
| # type: (str, List[str]) -> bool | |
| # a empty whitelist means the user does not want to use this feature | |
| if not whitelist: | |
| return True | |
| for whitelist_regexp in whitelist: | |
| if re.match(whitelist_regexp, pkgname): | |
| logging.debug("only upgrading the following package '%s'" % | |
| pkgname) | |
| return True | |
| return False | |
| def check_changes_for_sanity(cache, allowed_origins, blacklist, whitelist, | |
| desired_pkg=None): | |
| # type: (apt.Cache, List[str], List[str], List[str], apt.Package) -> bool | |
| if cache._depcache.broken_count != 0: | |
| return False | |
| # If there are no packages to be installed they were kept back | |
| if cache.install_count == 0: | |
| return False | |
| for pkg in cache.get_changes(): | |
| if pkg.marked_delete: | |
| logging.debug("pkg '%s' now marked delete" % pkg.name) | |
| return False | |
| if pkg.marked_install or pkg.marked_upgrade: | |
| # apt will never fallback from a trusted to a untrusted | |
| # origin so its good enough if we have a single trusted one | |
| if not any([o.trusted for o in pkg.candidate.origins]): | |
| logging.debug("pkg '%s' is untrusted" % pkg.name) | |
| return False | |
| if not is_allowed_origin(pkg.candidate, allowed_origins): | |
| logging.debug("pkg '%s' not in allowed origin" % pkg.name) | |
| return False | |
| if is_pkgname_in_blacklist(pkg.name, blacklist, []): | |
| logging.debug("pkg '%s' package has been blacklisted" % | |
| pkg.name) | |
| return False | |
| # a strict whitelist will not allow any changes not in the | |
| # whitelist, most people will want the relaxed whitelist | |
| # that whitelists a package but pulls in the package | |
| # dependencies | |
| strict_whitelist = apt_pkg.config.find_b( | |
| "Unattended-Upgrade::Package-Whitelist-Strict", False) | |
| if (strict_whitelist and | |
| not is_pkgname_in_whitelist(pkg.name, whitelist)): | |
| logging.debug("pkg '%s' package is not whitelisted" % | |
| pkg.name) | |
| return False | |
| if pkg._pkg.selected_state == apt_pkg.SELSTATE_HOLD: | |
| logging.debug("pkg '%s' is on hold" % pkg.name) | |
| return False | |
| # check if the package is unsafe to upgrade unattended | |
| ignore_require_restart = apt_pkg.config.find_b( | |
| "Unattended-Upgrade::IgnoreAppsRequireRestart", False) | |
| upgrade_requires = pkg.candidate.record.get("Upgrade-Requires") | |
| if (pkg.marked_upgrade and ignore_require_restart is False and | |
| upgrade_requires == "app-restart"): | |
| logging.debug("pkg '%s' requires app-restart, not safe to " | |
| "upgrade unattended") | |
| return False | |
| # check that the package we want to upgrade is in the change set | |
| if desired_pkg and desired_pkg not in cache.get_changes(): | |
| return False | |
| return True | |
| def pkgname_from_deb(debfile): | |
| # type: (str) -> str | |
| # FIXME: add error checking here | |
| try: | |
| control = apt_inst.DebFile(debfile).control.extractdata("control") | |
| sections = apt_pkg.TagSection(control) | |
| return sections["Package"] | |
| except (IOError, SystemError) as e: | |
| logging.error("failed to read deb file '%s' (%s)" % (debfile, e)) | |
| # dumb fallback | |
| return debfile.split("_")[0] | |
| def get_md5sum_for_file_in_deb(deb_file, conf_file): | |
| # type: (str, str) -> str | |
| dpkg_cmd = ["dpkg-deb", "--fsys-tarfile", deb_file] | |
| tar_cmd = ["tar", "-x", "-O", "-f", "-", "." + conf_file] | |
| md5_cmd = ["md5sum"] | |
| dpkg_p = Popen(dpkg_cmd, stdout=PIPE) | |
| tar_p = Popen(tar_cmd, stdin=dpkg_p.stdout, stdout=PIPE, | |
| universal_newlines=True) | |
| md5_p = Popen(md5_cmd, stdin=tar_p.stdout, stdout=PIPE, | |
| universal_newlines=True) | |
| pkg_md5sum = md5_p.communicate()[0].split()[0] | |
| return pkg_md5sum | |
| # prefix is *only* needed for the build-in tests | |
| def conffile_prompt(destFile, prefix=""): | |
| # type: (str, str) -> bool | |
| logging.debug("check_conffile_prompt('%s')" % destFile) | |
| pkgname = pkgname_from_deb(destFile) | |
| # get the conffiles for the /var/lib/dpkg/status file | |
| status_file = apt_pkg.config.find("Dir::State::status") | |
| tagfile = apt_pkg.TagFile(open(status_file, "r")) | |
| conffiles = "" | |
| for section in tagfile: | |
| if section.get("Package") == pkgname: | |
| logging.debug("found pkg: %s" % pkgname) | |
| if "Conffiles" in section: | |
| conffiles = section.get("Conffiles") | |
| break | |
| # get conffile value from pkg, its ok if the new version | |
| # does not have conffiles anymore | |
| pkg_conffiles = "" | |
| try: | |
| deb = apt_inst.DebFile(destFile) | |
| pkg_conffiles = deb.control.extractdata("conffiles").strip().decode( | |
| "utf-8") | |
| except SystemError as e: | |
| print(_("Apt returned an error, exiting")) | |
| print(_("error message: '%s'") % e) | |
| logging.error(_("Apt returned an error, exiting")) | |
| logging.error(_("error message: '%s'"), e) | |
| sys.exit(1) | |
| except LookupError as e: | |
| logging.debug("No conffiles in deb '%s' (%s)" % (destFile, e)) | |
| # Conffiles: | |
| # /etc/bash_completion.d/m-a c7780fab6b14d75ca54e11e992a6c11c | |
| dpkg_status_conffiles = {} | |
| for line in conffiles.splitlines(): | |
| # ignore empty lines | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # show what we do | |
| logging.debug("conffile line: '%s'", line) | |
| li = line.split() | |
| conf_file = li[0] | |
| md5 = li[1] | |
| if len(li) > 2: | |
| obs = li[2] | |
| else: | |
| obs = None | |
| # ignore if conffile is obsolete | |
| if obs == "obsolete": | |
| continue | |
| # ignore state "newconffile" until its clearer if there | |
| # might be a dpkg prompt (LP: #936870) | |
| if md5 == "newconffile": | |
| continue | |
| if not pkg_conffiles or conf_file not in pkg_conffiles.split("\n"): | |
| logging.debug("'%s' not in package conffiles '%s'" % ( | |
| conf_file, pkg_conffiles)) | |
| continue | |
| # record for later | |
| dpkg_status_conffiles[conf_file] = md5 | |
| # the package replaces a directory wih a configuration file | |
| # | |
| # if the package changed this way it is safe to assume that | |
| # the transition happens without showing a prompt but if the admin | |
| # created the directory the admin will need to resolve it after | |
| # being notified about the unexpected prompt | |
| if os.path.isdir(prefix + conf_file): | |
| continue | |
| # test against the installed file, if the local file got deleted | |
| # by the admin thats ok but it may still trigger a conffile prompt | |
| # (see debian #788049) | |
| current_md5 = "" | |
| if os.path.exists(prefix + conf_file): | |
| with open(prefix + conf_file, 'rb') as fb: | |
| current_md5 = apt_pkg.md5sum(fb) | |
| logging.debug("current md5: %s" % current_md5) | |
| # hashes are the same, no conffile prompt | |
| if current_md5 == md5: | |
| continue | |
| # calculate md5sum from the deb (may take a bit) | |
| pkg_md5sum = get_md5sum_for_file_in_deb(destFile, conf_file) | |
| logging.debug("pkg_md5sum: %s" % pkg_md5sum) | |
| # the md5sum in the deb is unchanged, this will not | |
| # trigger a conffile prompt | |
| if pkg_md5sum == md5: | |
| continue | |
| # if we made it to this point: | |
| # current_md5 != pkg_md5sum != md5 | |
| # and that will trigger a conffile prompt, we can | |
| # stop processing at this point and just return True | |
| return True | |
| # now check if there are conffiles in the pkg that where not there | |
| # in the previous version in the dpkg status file | |
| if pkg_conffiles: | |
| for conf_file in pkg_conffiles.split("\n"): | |
| if (conf_file not in dpkg_status_conffiles and | |
| os.path.exists(prefix + conf_file)): | |
| logging.debug("found conffile '%s' in new pkg but on dpkg " | |
| "status" % conf_file) | |
| pkg_md5sum = get_md5sum_for_file_in_deb(destFile, conf_file) | |
| with open(prefix + conf_file, 'rb') as fp: | |
| if pkg_md5sum != apt_pkg.md5sum(fp): | |
| return True | |
| return False | |
| def dpkg_conffile_prompt(): | |
| # type: () -> bool | |
| if "DPkg::Options" not in apt_pkg.config: | |
| return True | |
| options = apt_pkg.config.value_list("DPkg::Options") | |
| for option in options: | |
| option = option.strip() | |
| if option in ["--force-confold", "--force-confnew"]: | |
| return False | |
| return True | |
| def rewind_cache(cache, pkgs_to_upgrade): | |
| # type: (apt.Cache, List[apt.Package]) -> None | |
| """ set the cache back to the state with packages_to_upgrade """ | |
| cache.clear() | |
| for pkg2 in pkgs_to_upgrade: | |
| pkg2.mark_install(from_user=not pkg2.is_auto_installed) | |
| if cache.broken_count > 0: | |
| raise AssertionError("rewind_cache created a broken cache") | |
| def host(): | |
| # type: () -> str | |
| return os.uname()[1] | |
| # *sigh* textwrap is nice, but it breaks "linux-image" into two | |
| # seperate lines | |
| def wrap(t, width=70, subsequent_indent=""): | |
| # type: (str, int, str) -> str | |
| out = "" | |
| for s in t.split(): | |
| if (len(out) - out.rfind("\n")) + len(s) > width: | |
| out += "\n" + subsequent_indent | |
| out += s + " " | |
| return out | |
| def setup_apt_listchanges(conf="/etc/apt/listchanges.conf"): | |
| # type: (str) -> None | |
| """ deal with apt-listchanges """ | |
| # apt-listchanges will always send a mail if there is a mail address | |
| # set in the config regardless of the frontend used, so set it to | |
| # mail if we have a sendmail and to none if not (as it appears to | |
| # not check if sendmail is there or not), debian bug #579733 | |
| if os.path.exists(SENDMAIL_BINARY): | |
| os.environ["APT_LISTCHANGES_FRONTEND"] = "mail" | |
| else: | |
| os.environ["APT_LISTCHANGES_FRONTEND"] = "none" | |
| def _send_mail_using_mailx(from_address, to_address, subject, body): | |
| # type: (str, str, str, str) -> int | |
| # ensure that the body is a byte stream and that we do not | |
| # break on encoding errors (the default error mode is "strict") | |
| encoded_body = body.encode( | |
| locale.getpreferredencoding(False), errors="replace") | |
| # we use a binary pipe to stdin to ensure we do not break on | |
| # unicode encoding errors (e.g. because the user is running a | |
| # ascii only system like the buildds) | |
| mail = subprocess.Popen( | |
| [MAIL_BINARY, "-r", from_address, "-s", subject, to_address], | |
| stdin=subprocess.PIPE, universal_newlines=False) | |
| mail.stdin.write(encoded_body) | |
| mail.stdin.close() | |
| ret = mail.wait() | |
| return ret | |
| def _send_mail_using_sendmail(from_address, to_address, subject, body): | |
| # type: (str, str, str, str) -> int | |
| # format as a proper mail | |
| msg = Message() | |
| msg['Subject'] = subject | |
| msg['From'] = from_address | |
| msg['To'] = to_address | |
| msg['Auto-Submitted'] = "auto-generated" | |
| # order is important here, Message() first, then Charset() | |
| # then msg.set_charset() | |
| charset = email.charset.Charset("utf-8") | |
| charset.body_encoding = email.charset.QP # type: ignore | |
| msg.set_payload(body, charset) | |
| # and send it away | |
| sendmail = subprocess.Popen( | |
| [SENDMAIL_BINARY, "-oi", "-t"], | |
| stdin=subprocess.PIPE, universal_newlines=True) | |
| sendmail.stdin.write(msg.as_string()) | |
| sendmail.stdin.close() | |
| ret = sendmail.wait() | |
| return ret | |
| def send_summary_mail(pkgs, res, pkgs_kept_back, mem_log, dpkg_log_content): | |
| # type: (str, bool, List[str], StringIO, str) -> None | |
| """ send mail (if configured in Unattended-Upgrade::Mail) """ | |
| to_email = apt_pkg.config.find("Unattended-Upgrade::Mail", "") | |
| if not to_email: | |
| return | |
| if not os.path.exists(MAIL_BINARY) and not os.path.exists(SENDMAIL_BINARY): | |
| logging.error(_("No '/usr/bin/mail' or '/usr/sbin/sendmail'," | |
| "can not send mail. " | |
| "You probably want to install the 'mailx' package.")) | |
| return | |
| # if the operation was successful and the user has requested to get | |
| # mails on on errors, just exit here | |
| if (res and | |
| # see Debian Bug #703621 | |
| not re.search("^WARNING:", mem_log.getvalue(), re.MULTILINE) and | |
| apt_pkg.config.find_b( | |
| "Unattended-Upgrade::MailOnlyOnError", False)): | |
| return | |
| # Check if reboot-required flag is present | |
| reboot_flag_str = _( | |
| "[reboot required]") if os.path.isfile(REBOOT_REQUIRED_FILE) else "" | |
| # Check if packages are kept on hold | |
| hold_flag_str = _("[package on hold]") if pkgs_kept_back else "" | |
| logging.debug("Sending mail to '%s'" % to_email) | |
| subject = _( | |
| "{hold_flag}{reboot_flag} unattended-upgrades result for " | |
| "'{machine}': {result}").format( | |
| hold_flag=hold_flag_str, reboot_flag=reboot_flag_str, | |
| machine=host(), result=res).strip() | |
| body = _("Unattended upgrade returned: %s\n\n") % res | |
| if os.path.isfile(REBOOT_REQUIRED_FILE): | |
| body += _( | |
| "Warning: A reboot is required to complete this upgrade.\n\n") | |
| if res: | |
| body += _("Packages that were upgraded:\n") | |
| else: | |
| body += _("Packages that attempted to upgrade:\n") | |
| body += " " + wrap(pkgs, 70, " ") | |
| body += "\n" | |
| if pkgs_kept_back: | |
| body += _("Packages with upgradable origin but kept back:\n") | |
| body += " " + wrap(" ".join(pkgs_kept_back), 70, " ") | |
| body += "\n" | |
| body += "\n" | |
| if dpkg_log_content: | |
| body += _("Package installation log:") + "\n" | |
| body += dpkg_log_content | |
| body += "\n\n" | |
| body += _("Unattended-upgrades log:\n") | |
| body += mem_log.getvalue() | |
| from_email = apt_pkg.config.find("Unattended-Upgrade::Sender", "root") | |
| if os.path.exists(SENDMAIL_BINARY): | |
| ret = _send_mail_using_sendmail(from_email, to_email, subject, body) | |
| elif os.path.exists(MAIL_BINARY): | |
| ret = _send_mail_using_mailx(from_email, to_email, subject, body) | |
| else: | |
| raise AssertionError( | |
| "This should never be reached, if we are here we either " | |
| "have sendmail or mailx. Please report this as a bug.") | |
| logging.debug("mail returned: %s", ret) | |
| def do_install(cache, # type: apt.Cache | |
| pkgs_to_upgrade, # type: List[apt.Package] | |
| blacklisted_pkgs, # type: List[str] | |
| whitelisted_pkgs, # type: List[str] | |
| options, # type: Options | |
| logfile_dpkg, # type: str | |
| ): | |
| # type: (...) -> bool | |
| # set debconf to NON_INTERACTIVE, redirect output | |
| os.putenv("DEBIAN_FRONTEND", "noninteractive") | |
| setup_apt_listchanges() | |
| logging.info(_("Writing dpkg log to '%s'"), logfile_dpkg) | |
| marked_delete = [pkg for pkg in cache.get_changes() if pkg.marked_delete] | |
| if marked_delete: | |
| raise AssertionError( | |
| "Internal error. The following packages are marked for " | |
| "removal:%s" % "".join([pkg.name for pkg in marked_delete])) | |
| pkg_install_success = False | |
| try: | |
| if (options.minimal_upgrade_steps or | |
| # COMPAT with the mispelling | |
| (apt_pkg.config.find_b( | |
| "Unattended-Upgrades::MinimalSteps", True) and | |
| apt_pkg.config.find_b( | |
| "Unattended-Upgrade::MinimalSteps", True))): | |
| # try upgrade all "pkgs" in minimal steps | |
| pkg_install_success = upgrade_in_minimal_steps( | |
| cache, [pkg.name for pkg in pkgs_to_upgrade], | |
| blacklisted_pkgs, whitelisted_pkgs, logfile_dpkg, | |
| options.verbose or options.debug) | |
| else: | |
| pkg_install_success = upgrade_normal( | |
| cache, logfile_dpkg, options.verbose or options.debug) | |
| except Exception as e: | |
| # print unhandled exceptions here this way, while stderr is redirected | |
| os.write(2, ("Exception: %s\n" % e).encode('utf-8')) | |
| pkg_install_success = False | |
| return pkg_install_success | |
| def _setup_alternative_rootdir(rootdir): | |
| # type: (str) -> None | |
| # clear system unattended-upgrade stuff | |
| apt_pkg.config.clear("Unattended-Upgrade") | |
| # read rootdir (taken from apt.Cache, but we need to run it | |
| # here before the cache gets initialized | |
| if os.path.exists(rootdir + "/etc/apt/apt.conf"): | |
| apt_pkg.read_config_file(apt_pkg.config, | |
| rootdir + "/etc/apt/apt.conf") | |
| if os.path.isdir(rootdir + "/etc/apt/apt.conf.d"): | |
| apt_pkg.read_config_dir(apt_pkg.config, | |
| rootdir + "/etc/apt/apt.conf.d") | |
| logdir = os.path.join(rootdir, "var", "log", "unattended-upgrades") | |
| if not os.path.exists(logdir): | |
| os.makedirs(logdir) | |
| apt.apt_pkg.config.set("Unattended-Upgrade::LogDir", logdir) | |
| def _get_logdir(): | |
| # type: () -> str | |
| logdir = apt_pkg.config.find_dir( | |
| "Unattended-Upgrade::LogDir", | |
| # COMPAT only | |
| apt_pkg.config.find_dir("APT::UnattendedUpgrades::LogDir", | |
| "/var/log/unattended-upgrades/")) | |
| return logdir | |
| def _setup_logging(options): | |
| # type: (Options) -> StringIO | |
| # ensure this is run only once | |
| if len(logging.root.handlers) > 0: | |
| return None | |
| # init the logging | |
| logdir = _get_logdir() | |
| logfile = os.path.join( | |
| logdir, | |
| apt_pkg.config.find( | |
| "Unattended-Upgrade::LogFile", | |
| # COMPAT only | |
| apt_pkg.config.find("APT::UnattendedUpgrades::LogFile", | |
| "unattended-upgrades.log"))) | |
| if not options.dry_run and not os.path.exists(logdir): | |
| os.makedirs(logdir) | |
| logging.basicConfig(level=logging.INFO, | |
| format='%(asctime)s %(levelname)s %(message)s', | |
| filename=logfile) | |
| # additional logging | |
| logger = logging.getLogger() | |
| mem_log = StringIO() | |
| if options.apt_debug: | |
| apt_pkg.config.set("Debug::pkgProblemResolver", "1") | |
| apt_pkg.config.set("Debug::pkgDepCache::AutoInstall", "1") | |
| if options.debug: | |
| logger.setLevel(logging.DEBUG) | |
| stdout_handler = logging.StreamHandler(sys.stdout) | |
| logger.addHandler(stdout_handler) | |
| elif options.verbose: | |
| logger.setLevel(logging.INFO) | |
| stdout_handler = logging.StreamHandler(sys.stdout) | |
| logger.addHandler(stdout_handler) | |
| if apt_pkg.config.find("Unattended-Upgrade::Mail", ""): | |
| mem_log_handler = logging.StreamHandler(mem_log) | |
| logger.addHandler(mem_log_handler) | |
| # Configure syslog if necessary | |
| syslogEnable = apt_pkg.config.find_b("Unattended-Upgrade::SyslogEnable", | |
| False) | |
| if syslogEnable: | |
| syslogFacility = apt_pkg.config.find( | |
| "Unattended-Upgrade::SyslogFacility", | |
| "daemon") | |
| syslogHandler = logging.handlers.SysLogHandler( | |
| address='/dev/log', | |
| facility=syslogFacility) | |
| syslogHandler.setFormatter( | |
| logging.Formatter("unattended-upgrade: %(message)s")) | |
| known = syslogHandler.facility_names.keys() # type: ignore | |
| if syslogFacility.lower() in known: | |
| logger.addHandler(syslogHandler) | |
| logging.info("Enabled logging to syslog via %s facility " | |
| % syslogFacility) | |
| else: | |
| logging.warning("Syslog facility %s was not found" | |
| % syslogFacility) | |
| return mem_log | |
| def get_blacklisted_pkgs(): | |
| # type: () -> List[str] | |
| return apt_pkg.config.value_list("Unattended-Upgrade::Package-Blacklist") | |
| def get_whitelisted_pkgs(): | |
| # type: () -> List[str] | |
| return apt_pkg.config.value_list("Unattended-Upgrade::Package-Whitelist") | |
| def logged_in_users(): | |
| # type: () -> AbstractSet[str] | |
| """Return a list of logged in users""" | |
| # the "users" command always returns a single line with: | |
| # "user1, user1, user2" | |
| users = subprocess.check_output( | |
| USERS, universal_newlines=True).rstrip('\n') | |
| return set(users.split()) | |
| def reboot_if_requested_and_needed(): | |
| # type: () -> None | |
| """auto-reboot (if required and the config for this is set)""" | |
| if not os.path.exists(REBOOT_REQUIRED_FILE): | |
| return | |
| if not apt_pkg.config.find_b( | |
| "Unattended-Upgrade::Automatic-Reboot", False): | |
| return | |
| # see if we need to check for logged in users | |
| if not apt_pkg.config.find_b( | |
| "Unattended-Upgrade::Automatic-Reboot-WithUsers", True): | |
| users = logged_in_users() | |
| if users: | |
| msg = gettext.ngettext( | |
| "Found %s, but not rebooting because %s is logged in." % ( | |
| REBOOT_REQUIRED_FILE, users), | |
| "Found %s, but not rebooting because %s are logged in." % ( | |
| REBOOT_REQUIRED_FILE, users), | |
| len(users)) | |
| logging.warning(msg) | |
| return | |
| # reboot at the specified time | |
| when = apt_pkg.config.find( | |
| "Unattended-Upgrade::Automatic-Reboot-Time", "now") | |
| logging.warning("Found %s, rebooting" % REBOOT_REQUIRED_FILE) | |
| subprocess.call(["/sbin/shutdown", "-r", when]) | |
| def write_stamp_file(): | |
| # type: () -> None | |
| statedir = os.path.join(apt_pkg.config.find_dir("Dir::State"), "periodic") | |
| if not os.path.exists(statedir): | |
| os.makedirs(statedir) | |
| with open(os.path.join(statedir, "unattended-upgrades-stamp"), "w"): | |
| pass | |
| def try_to_upgrade(pkg, # type: apt.Package | |
| pkgs_to_upgrade, # type: List[apt.Package] | |
| pkgs_kept_back, # type: List[str] | |
| cache, # type: apt.Cache | |
| allowed_origins, # type: List[str] | |
| blacklisted_pkgs, # type: List[str] | |
| whitelisted_pkgs, # type: List[str] | |
| ): | |
| # type: (...) -> None | |
| try: | |
| pkg.mark_upgrade(from_user=not pkg.is_auto_installed) | |
| if check_changes_for_sanity(cache, allowed_origins, | |
| blacklisted_pkgs, whitelisted_pkgs, | |
| pkg): | |
| # add to packages to upgrade | |
| pkgs_to_upgrade.append(pkg) | |
| # re-eval pkgs_kept_back as the resolver may fail to | |
| # directly upgrade a pkg, but that may work during | |
| # a subsequent operation, see debian bug #639840 | |
| for pkgname in pkgs_kept_back: | |
| if (cache[pkgname].marked_install or | |
| cache[pkgname].marked_upgrade): | |
| pkgs_kept_back.remove(pkgname) | |
| pkgs_to_upgrade.append(cache[pkgname]) | |
| else: | |
| logging.debug("sanity check failed") | |
| rewind_cache(cache, pkgs_to_upgrade) | |
| pkgs_kept_back.append(pkg.name) | |
| except SystemError as e: | |
| # can't upgrade | |
| logging.warning( | |
| _("package '%s' upgradable but fails to " | |
| "be marked for upgrade (%s)"), pkg.name, e) | |
| rewind_cache(cache, pkgs_to_upgrade) | |
| pkgs_kept_back.append(pkg.name) | |
| def calculate_upgradable_pkgs(cache, # type: apt.Cache | |
| options, # type: Options | |
| allowed_origins, # type: List[str] | |
| blacklisted_pkgs, # type: List[str] | |
| whitelisted_pkgs, # type: List[str] | |
| ): | |
| # type: (...) -> Tuple[List[apt.Package], List[str]] | |
| pkgs_to_upgrade = [] # type: List[apt.Package] | |
| pkgs_kept_back = [] # type: List[str] | |
| # now do the actual upgrade | |
| for pkg in cache: | |
| if options.debug and pkg.is_upgradable: | |
| logging.debug("Checking: %s (%s)" % ( | |
| pkg.name, getattr(pkg.candidate, "origins", []))) | |
| if (pkg.is_upgradable and | |
| not is_pkgname_in_blacklist(pkg.name, blacklisted_pkgs, | |
| pkgs_kept_back) and | |
| is_pkgname_in_whitelist(pkg.name, whitelisted_pkgs) and | |
| is_allowed_origin(pkg.candidate, allowed_origins)): | |
| try_to_upgrade(pkg, | |
| pkgs_to_upgrade, | |
| pkgs_kept_back, | |
| cache, | |
| allowed_origins, | |
| blacklisted_pkgs, | |
| whitelisted_pkgs) | |
| return pkgs_to_upgrade, pkgs_kept_back | |
| def get_dpkg_log_content(logfile_dpkg, install_start_time): | |
| # type: (str, datetime.datetime) -> str | |
| logging.debug("Extracting content from '%s' since '%s'" % ( | |
| logfile_dpkg, install_start_time)) | |
| content = [] | |
| found_start = False | |
| with io.open(logfile_dpkg, encoding='utf-8', errors='replace') as fp: | |
| # read until we find the last "Log started: " | |
| for line in fp.readlines(): | |
| # scan for the first entry we need (minimal-step mode | |
| # creates a new stanza for each individual install) | |
| if not found_start and line.startswith("Log started: "): | |
| stanza_start = LoggingDateTime.from_string( | |
| line[len("Log started: "):-1]) | |
| if stanza_start >= install_start_time: | |
| found_start = True | |
| if found_start: | |
| content.append(line) | |
| return "".join(content) | |
| def get_auto_removable(cache): | |
| # type: (apt.Cache) -> AbstractSet[str] | |
| return set([pkg.name for pkg in cache | |
| if pkg.is_auto_removable]) | |
| def do_auto_remove(cache, auto_removable, logfile_dpkg, | |
| verbose=False, dry_run=False): | |
| # type: (apt.Cache, Iterable[str], str, bool, bool) -> bool | |
| if not auto_removable: | |
| return True | |
| for pkgname in auto_removable: | |
| if not dry_run: | |
| logging.debug("marking %s for remove" % pkgname) | |
| cache[pkgname].mark_delete() | |
| logging.info(_("Packages that are auto removed: '%s'"), | |
| " ".join(sorted(auto_removable))) | |
| # do it | |
| res, error = cache_commit(cache, logfile_dpkg, verbose) | |
| if res: | |
| logging.info(_("Packages were successfully auto-removed")) | |
| else: | |
| logging.error(_("Auto-removing the packages failed!")) | |
| logging.error(_("Error message: '%s'"), error) | |
| logging.error(_("dpkg returned an error! See '%s' for details"), | |
| logfile_dpkg) | |
| return res | |
| def clean_downloaded_packages(fetcher): | |
| # type: (apt_pkg.Acquire) -> None | |
| archivedir = os.path.dirname( | |
| apt_pkg.config.find_dir("Dir::Cache::archives")) | |
| for item in fetcher.items: | |
| if os.path.dirname(os.path.abspath(item.destfile)) == archivedir: | |
| try: | |
| os.unlink(item.destfile) | |
| except OSError: | |
| pass | |
| def is_update_day(): | |
| # type: () -> bool | |
| # check if patch days are configured | |
| patch_days = apt_pkg.config.value_list("Unattended-Upgrade::Update-Days") | |
| if not patch_days: | |
| return True | |
| # validate patch days | |
| today = date.today() | |
| # abbreviated localized dayname | |
| if today.strftime("%a") in patch_days: | |
| return True | |
| # full localized dayname | |
| if today.strftime("%A") in patch_days: | |
| return True | |
| # by number (Sun: 0, Mon: 1, ...) | |
| if today.strftime("%w") in patch_days: | |
| return True | |
| # today is not a patch day | |
| logging.info( | |
| "Skipping update check: today is '%s,%s,%s' but patch days are '%s'", | |
| today.strftime("%w"), today.strftime("%a"), today.strftime("%A"), | |
| patch_days) | |
| return False | |
| def main(options, rootdir=""): | |
| # type: (Options, str) -> int | |
| # useful for testing | |
| if rootdir: | |
| _setup_alternative_rootdir(rootdir) | |
| # setup logging | |
| mem_log = _setup_logging(options) | |
| # check if today is a patch day | |
| if not is_update_day(): | |
| return 0 | |
| # format (origin, archive), e.g. ("Ubuntu","dapper-security") | |
| allowed_origins = get_allowed_origins() | |
| # pkgs that are (for some reason) not safe to install | |
| blacklisted_pkgs = get_blacklisted_pkgs() | |
| logging.info(_("Initial blacklisted packages: %s"), | |
| " ".join(blacklisted_pkgs)) | |
| # install only these packages regardless of other upgrades available | |
| whitelisted_pkgs = get_whitelisted_pkgs() | |
| logging.info(_("Initial whitelisted packages: %s"), | |
| " ".join(whitelisted_pkgs)) | |
| logging.info(_("Starting unattended upgrades script")) | |
| # lock for the shutdown check | |
| shutdown_lock = apt_pkg.get_lock(LOCK_FILE) | |
| if shutdown_lock < 0: | |
| logging.error("Lock file is already taken, exiting") | |
| sys.exit(1) | |
| # display available origin | |
| logging.info(_("Allowed origins are: %s"), allowed_origins) | |
| # see debian #776752 | |
| install_start_time = datetime.datetime.now().replace(microsecond=0) | |
| # check if the journal is dirty and if so, take emergceny action | |
| # the alternative is to leave the system potentially unsecure until | |
| # the user comes in and fixes | |
| if (is_dpkg_journal_dirty() and | |
| apt_pkg.config.find_b( | |
| "Unattended-Upgrade::AutoFixInterruptedDpkg", True)): | |
| # ensure the dpkg database is not already locked (LP: #754330) | |
| admindir = os.path.dirname(apt_pkg.config.find("Dir::State::Status")) | |
| lockfd = apt_pkg.get_lock(os.path.join(admindir, "lock"), False) | |
| if lockfd > 0: | |
| logging.warning( | |
| _("Unclean dpkg state detected, trying to correct")) | |
| print(_("Unclean dpkg state detected, trying to correct")) | |
| env = copy.copy(os.environ) | |
| env["DEBIAN_FRONTEND"] = "noninteractive" | |
| try: | |
| os.close(lockfd) | |
| output = subprocess.check_output( | |
| ["dpkg", "--force-confold", "--configure", "-a"], | |
| env=env, | |
| universal_newlines=True) | |
| except subprocess.CalledProcessError as e: | |
| output = e.output | |
| logging.warning(_("dpkg --configure -a output:\n%s"), output) | |
| else: | |
| logging.debug("Unclean dpkg state, but locked, another package " | |
| "manager working?") | |
| # check and get lock | |
| try: | |
| apt_pkg.pkgsystem_lock() | |
| except SystemError as e: | |
| logging.error(_("Lock could not be acquired (another package " | |
| "manager running?)")) | |
| print(_("Cache lock can not be acquired, exiting")) | |
| sys.exit(1) | |
| # get a cache | |
| try: | |
| cache = UnattendedUpgradesCache(rootdir=rootdir, | |
| allowed_origins=allowed_origins) | |
| except SystemError as error: | |
| print(_("Apt returned an error, exiting")) | |
| print(_("error message: '%s'") % error) | |
| logging.error(_("Apt returned an error, exiting")) | |
| logging.error(_("error message: '%s'"), error) | |
| sys.exit(1) | |
| if cache._depcache.broken_count > 0: | |
| print(_("Cache has broken packages, exiting")) | |
| logging.error(_("Cache has broken packages, exiting")) | |
| sys.exit(1) | |
| # FIXME: make this into a ContextManager | |
| # be nice when calculating the upgrade as its pretty CPU intensive | |
| old_priority = os.nice(0) | |
| try: | |
| # Check that we will be able to restore the priority | |
| os.nice(-1) | |
| os.nice(20) | |
| except OSError as e: | |
| if e.errno in (errno.EPERM, errno.EACCES): | |
| pass | |
| else: | |
| raise | |
| # speed things up with latest apt | |
| actiongroup = apt_pkg.ActionGroup(cache._depcache) | |
| actiongroup # pyflakes | |
| # find out about the packages that are upgradable (in an allowed_origin) | |
| pkgs_to_upgrade, pkgs_kept_back = calculate_upgradable_pkgs( | |
| cache, options, allowed_origins, blacklisted_pkgs, whitelisted_pkgs) | |
| pkgs_to_upgrade.sort(key=lambda p: p.name) | |
| pkgs = "\n".join([pkg.name for pkg in pkgs_to_upgrade]) | |
| logging.debug("pkgs that look like they should be upgraded: %s" % pkgs) | |
| # FIXME: make this into a ContextManager | |
| # stop being nice | |
| os.nice(old_priority - os.nice(0)) | |
| # download what looks good | |
| if options.debug: | |
| fetcher = apt_pkg.Acquire(apt.progress.text.AcquireProgress()) | |
| else: | |
| fetcher = apt_pkg.Acquire() | |
| list = apt_pkg.SourceList() | |
| list.read_main_list() | |
| recs = cache._records | |
| pm = apt_pkg.PackageManager(cache._depcache) | |
| # don't start downloading during shutdown | |
| # TODO: download files one by one and check for stop request after each of | |
| # them | |
| if SIGNAL_STOP_REQUEST: | |
| logging.warning("SIGNAL received, stopping") | |
| return 1 | |
| try: | |
| pm.get_archives(fetcher, list, recs) | |
| except SystemError as e: | |
| logging.error(_("GetArchives() failed: '%s'"), e) | |
| try: | |
| res = fetcher.run() | |
| logging.debug("fetch.run() result: %s", res) | |
| except SystemError as e: | |
| logging.error("fetch.run() result: %s", e) | |
| if options.download_only: | |
| return 0 | |
| if dpkg_conffile_prompt(): | |
| # now check the downloaded debs for conffile conflicts and build | |
| # a blacklist | |
| for item in fetcher.items: | |
| logging.debug("%s" % item) | |
| if item.status == item.STAT_ERROR: | |
| print(_("An error occurred: '%s'") % item.error_text) | |
| logging.error(_("An error occurred: '%s'"), item.error_text) | |
| if not item.complete: | |
| print(_("The URI '%s' failed to download, aborting") % | |
| item.desc_uri) | |
| logging.error(_("The URI '%s' failed to download, aborting"), | |
| item.desc_uri) | |
| sys.exit(1) | |
| if not os.path.exists(item.destfile): | |
| print(_("Download finished, but file '%s' not there?!?") % | |
| item.destfile) | |
| logging.error("Download finished, but file '%s' not " | |
| "there?!?", item.destfile) | |
| sys.exit(1) | |
| if not item.is_trusted: | |
| blacklisted_pkgs.append(pkgname_from_deb(item.destfile)) | |
| if conffile_prompt(item.destfile): | |
| # skip package (means to re-run the whole marking again | |
| # and making sure that the package will not be pulled in by | |
| # some other package again!) | |
| # | |
| # print to stdout to ensure that this message is part of | |
| # the cron mail (only if no summary mail is requested) | |
| email = apt_pkg.config.find("Unattended-Upgrade::Mail", "") | |
| if not email: | |
| print(_("Package '%s' has conffile prompt and needs " | |
| "to be upgraded manually") % | |
| pkgname_from_deb(item.destfile)) | |
| # log to the logfile | |
| logging.warning(_("Package '%s' has conffile prompt and " | |
| "needs to be upgraded manually"), | |
| pkgname_from_deb(item.destfile)) | |
| blacklisted_pkgs.append(pkgname_from_deb(item.destfile)) | |
| pkgs_kept_back.append(pkgname_from_deb(item.destfile)) | |
| # redo the selection about the packages to upgrade based on the new | |
| # blacklist | |
| logging.debug("blacklist: %s" % blacklisted_pkgs) | |
| # whitelist | |
| logging.debug("whitelist: %s" % whitelisted_pkgs) | |
| # find out about the packages that are upgradable (in a allowed_origin) | |
| if len(blacklisted_pkgs) > 0 or len(whitelisted_pkgs) > 0: | |
| cache.clear() | |
| old_pkgs_to_upgrade = pkgs_to_upgrade[:] | |
| pkgs_to_upgrade = [] | |
| for pkg in old_pkgs_to_upgrade: | |
| logging.debug("Checking the black and whitelist: %s" % | |
| (pkg.name)) | |
| pkg.mark_upgrade(from_user=not pkg.is_auto_installed) | |
| if check_changes_for_sanity(cache, | |
| allowed_origins, | |
| blacklisted_pkgs, | |
| whitelisted_pkgs): | |
| pkgs_to_upgrade.append(pkg) | |
| else: | |
| if not (pkg.name in pkgs_kept_back): | |
| pkgs_kept_back.append(pkg.name) | |
| logging.info(_("package '%s' not upgraded"), pkg.name) | |
| cache.clear() | |
| for pkg2 in pkgs_to_upgrade: | |
| pkg2.mark_upgrade(from_user=not pkg2.is_auto_installed) | |
| else: | |
| logging.debug("dpkg is configured not to cause conffile prompts") | |
| # auto-removal | |
| previous_autoremovals = get_auto_removable(cache) | |
| if apt_pkg.config.find_b( | |
| "Unattended-Upgrade::Remove-Unused-Dependencies", False): | |
| pending_autoremovals = previous_autoremovals | |
| else: | |
| pending_autoremovals = set() | |
| # exit if there is nothing to do and nothing to report | |
| if (len(pending_autoremovals) == 0 and | |
| len(pkgs_to_upgrade) == 0 and | |
| len(pkgs_kept_back) == 0): | |
| logging.info(_("No packages found that can be upgraded unattended " | |
| "and no pending auto-removals")) | |
| # FIXME: DRY violation, write_stamp_file() is used below as well | |
| write_stamp_file() | |
| # check if we couldn't reboot on previous run because | |
| # a user was logged-in at this time | |
| os.close(shutdown_lock) | |
| # never reboot during a dry run | |
| if not options.dry_run: | |
| reboot_if_requested_and_needed() | |
| return 0 | |
| # check if its configured for install on shutdown, if so, the | |
| # environment UNATTENDED_UPGRADES_FORCE_INSTALL_ON_SHUTDOWN will | |
| # be set by the unatteded-upgrades-shutdown script | |
| if ("UNATTENDED_UPGRADES_FORCE_INSTALL_ON_SHUTDOWN" not in os.environ and | |
| apt_pkg.config.find_b( | |
| "Unattended-Upgrade::InstallOnShutdown", False)): | |
| logger = logging.getLogger() | |
| logger.debug("Configured to install on shutdown, so exiting now") | |
| os.close(shutdown_lock) | |
| return 0 | |
| # check if we are in dry-run mode | |
| if options.dry_run: | |
| logging.info("Option --dry-run given, *not* performing real actions") | |
| apt_pkg.config.set("Debug::pkgDPkgPM", "1") | |
| # do the install based on the new list of pkgs | |
| pkgs = " ".join([pkg.name for pkg in pkgs_to_upgrade]) | |
| logging.info(_("Packages that will be upgraded: %s"), pkgs) | |
| # get log | |
| logfile_dpkg = os.path.join(_get_logdir(), 'unattended-upgrades-dpkg.log') | |
| if not os.path.exists(logfile_dpkg): | |
| with open(logfile_dpkg, 'w'): | |
| pass | |
| # only perform install step if we actually have packages to install | |
| pkg_install_success = True | |
| if len(pkgs_to_upgrade) > 0: | |
| # do install | |
| pkg_install_success = do_install(cache, | |
| pkgs_to_upgrade, | |
| blacklisted_pkgs, | |
| whitelisted_pkgs, | |
| options, | |
| logfile_dpkg) | |
| # now check if any auto-removing needs to be done | |
| cache = UnattendedUpgradesCache( | |
| rootdir=rootdir, allowed_origins=allowed_origins) | |
| if cache._depcache.broken_count > 0: | |
| print(_("Cache has broken packages, exiting")) | |
| logging.error(_("Cache has broken packages, exiting")) | |
| sys.exit(1) | |
| # the user wants *all* auto-removals to be removed | |
| # (unless u-u got signalled to stop gracefully quickly) | |
| if ((apt_pkg.config.find_b( | |
| "Unattended-Upgrade::Remove-Unused-Dependencies", False) and | |
| not SIGNAL_STOP_REQUEST)): | |
| auto_removals = get_auto_removable(cache) | |
| pkg_install_success = pkg_install_success and do_auto_remove( | |
| cache, auto_removals, logfile_dpkg, | |
| options.verbose or options.debug, | |
| options.dry_run) | |
| # the user wants *only new* auto-removals to be removed | |
| elif apt_pkg.config.find_b( | |
| "Unattended-Upgrade::Remove-New-Unused-Dependencies", True): | |
| # calculate the new auto-removals | |
| new_pending_autoremovals = get_auto_removable(cache) | |
| auto_removals = new_pending_autoremovals - previous_autoremovals | |
| pkg_install_success = pkg_install_success and do_auto_remove( | |
| cache, auto_removals, logfile_dpkg, | |
| options.verbose or options.debug, | |
| options.dry_run) | |
| logging.debug("InstCount=%i DelCount=%i BrokenCount=%i" | |
| % (cache._depcache.inst_count, | |
| cache._depcache.del_count, | |
| cache._depcache.broken_count)) | |
| # send a mail (if needed) | |
| if not options.dry_run: | |
| log_content = get_dpkg_log_content(logfile_dpkg, install_start_time) | |
| send_summary_mail( | |
| pkgs, pkg_install_success, pkgs_kept_back, mem_log, log_content) | |
| # clean after success install (if needed) | |
| keep_key = "Unattended-Upgrade::Keep-Debs-After-Install" | |
| if (not apt_pkg.config.find_b(keep_key, False) and | |
| not options.dry_run and | |
| pkg_install_success): | |
| clean_downloaded_packages(fetcher) | |
| # FIXME: DRY violation, write_stamp_file() is used above as well | |
| # write timestamp file | |
| write_stamp_file() | |
| os.close(shutdown_lock) | |
| # check if the user wants a reboot | |
| if not options.dry_run: | |
| reboot_if_requested_and_needed() | |
| if pkg_install_success: | |
| return 0 | |
| else: | |
| return 1 | |
| class Options: | |
| def __init__(self): | |
| self.download_only = False | |
| self.dry_run = False | |
| self.debug = False | |
| self.apt_debug = False | |
| self.verbose = False | |
| self.minimal_upgrade_steps = False | |
| if __name__ == "__main__": | |
| localesApp = "unattended-upgrades" | |
| localesDir = "/usr/share/locale" | |
| gettext.bindtextdomain(localesApp, localesDir) | |
| gettext.textdomain(localesApp) | |
| # this ensures the commandline is logged in /var/log/apt/history.log | |
| apt_pkg.config.set("Commandline::AsString", " ".join(sys.argv)) | |
| # init the options | |
| parser = OptionParser() | |
| parser.add_option("-d", "--debug", | |
| action="store_true", default=False, | |
| help=_("print debug messages")) | |
| parser.add_option("", "--apt-debug", | |
| action="store_true", default=False, | |
| help=_("make apt/libapt print verbose debug messages")) | |
| parser.add_option("-v", "--verbose", | |
| action="store_true", default=False, | |
| help=_("print info messages")) | |
| parser.add_option("", "--dry-run", | |
| action="store_true", default=False, | |
| help=_("Simulation, download but do not install")) | |
| parser.add_option("", "--download-only", | |
| action="store_true", default=False, | |
| help=_("Only download, do not even try to install.")) | |
| parser.add_option("", "--minimal-upgrade-steps", | |
| action="store_true", default=False, | |
| help=_("Upgrade in minimal steps (and allow " | |
| "interrupting with SIGTERM")) | |
| parser.add_option("", "--minimal_upgrade_steps", | |
| action="store_true", | |
| help=SUPPRESS_HELP, | |
| default=False) | |
| (options, args) = parser.parse_args() # type: ignore | |
| if os.getuid() != 0: | |
| print(_("You need to be root to run this application")) | |
| sys.exit(1) | |
| # ensure that we are not killed when the terminal goes away e.g. on | |
| # shutdown | |
| signal.signal(signal.SIGHUP, signal.SIG_IGN) | |
| # setup signal handler for graceful stopping | |
| signal.signal(signal.SIGTERM, signal_handler) | |
| # write pid to let other processes find this one | |
| pidf = os.path.join(apt_pkg.config.find_dir("Dir"), | |
| "var", "run", "unattended-upgrades.pid") | |
| # clean up pid file on exit | |
| with open(pidf, "w") as fp: | |
| fp.write("%s" % os.getpid()) | |
| atexit.register(os.remove, pidf) | |
| # run the main code | |
| sys.exit(main(options)) |