Skip to content

Commit

Permalink
Merge pull request #33 from rohanpm/rpm2cpio
Browse files Browse the repository at this point in the history
Support extracting RPMs with rpm2cpio
  • Loading branch information
rohanpm committed Jul 26, 2020
2 parents 01cfa73 + bc456fd commit c763f4e
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ sudo: false
language: python

before_install:
- sudo apt-get install -y rpm
- sudo apt-get install -y rpm cpio

install: pip install tox

Expand Down
96 changes: 94 additions & 2 deletions alt_src/alt_src.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import shutil
import simplejson as json
import six
from six.moves import configparser
from six.moves import configparser, shlex_quote
from six.moves import cStringIO as StringIO
from six.moves.urllib.parse import urlencode
from six.moves.urllib.request import Request, urlopen
Expand Down Expand Up @@ -1971,17 +1971,33 @@ def acquire_lock(lock_file_path, wait_time, sleep_interval, logger):


def explode_srpm(srpm, destdir=None, logfile=None):
# explode our srpm to the given directory
# explode our srpm to the given directory using the best possible means:
# - by installing the RPM
# - or falling back to rpm2cpio

header = koji.get_rpm_header(srpm)
if header[rpm.RPMTAG_SOURCEPACKAGE] != 1:
# we checked this earlier, but since we're about to rpm -i it,
# let's check again
raise SanityError("%s is not a source package" % srpm)

if destdir is None:
destdir = os.getcwd()
else:
destdir = os.path.abspath(destdir)
koji.ensuredir(destdir)

try:
return explode_srpm_install(srpm, destdir, logfile)
except CommandError:
logging.getLogger("altsrc").exception("Could not install SRPM, trying rpm2cpio")

return explode_srpm_cpio(srpm, header, destdir, logfile)


def explode_srpm_install(srpm, destdir=None, logfile=None):
# explode our srpm to the given directory by installing it

cmd = ['rpm', '--nosignature', '-i', '--define', '_topdir %s' % destdir, srpm]
#print "Running: %r" % cmd
popts = {'close_fds':True}
Expand All @@ -1994,6 +2010,82 @@ def explode_srpm(srpm, destdir=None, logfile=None):
raise CommandError("command failed: %r" % cmd)



def explode_srpm_cpio(srpm, header, destdir=None, logfile=None):
# explode our srpm to the given directory by extracting with rpm2cpio

popts = {'close_fds': True}
if logfile:
popts['stdout'] = logfile
popts['stderr'] = subprocess.STDOUT

cmd = (
'set -o pipefail; '
'rpm2cpio %s | '
'cpio --extract --make-directories --preserve-modification-time --unconditional'
) % shlex_quote(os.path.abspath(srpm))

# Note: /bin/sh is not guaranteed to understand "pipefail", hence explicit
# usage of bash
proc = subprocess.Popen(['/bin/bash', '-c', cmd], cwd=destdir, **popts)

ret = proc.wait()
if ret:
raise CommandError("command failed: %r" % cmd)

# As we did not install the RPM, usual SPECS/SOURCES redirections hardcoded into
# RPM for installing SRPMS didn't happen; we need to do this ourselves.
relocate_sources(header, destdir)


def spec_from_headers(headers):
"""Given RPM headers, decide which file is the RPM's .spec file.
See also headerFindSpec in RPM.
"""
fileinfo = zip(headers[rpm.RPMTAG_BASENAMES], headers[rpm.RPMTAG_FILEFLAGS])

# RPM can produce strs or bytes depending on version, make it consistent
fileinfo = [(six.ensure_text(basename), flags)
for (basename, flags) in fileinfo]

for (basename, flags) in fileinfo:
if flags & rpm.RPMFILE_SPECFILE:
return basename

# If no explicitly marked spec file, we use the first one with
# matching filename.
for (basename, _) in fileinfo:
if basename.endswith('.spec'):
return basename


def relocate_sources(headers, dir):
"""Relocate SRPM files from rpm2cpio into the structure typically
used by RPM (i.e. 'SOURCES' and 'SPECS' directories).
See also rpmRelocateSrpmFileList in RPM.
"""
specdir = os.path.join(dir, 'SPECS')
sourcedir = os.path.join(dir, 'SOURCES')
koji.ensuredir(specdir)
koji.ensuredir(sourcedir)

specfile = spec_from_headers(headers)

for basename in headers[rpm.RPMTAG_BASENAMES]:
# note rpm may give bytes or strs depending on version
basename = six.ensure_text(basename)

# Every file goes into either SPECS or SOURCES.
src = os.path.join(dir, basename)
if basename == specfile:
destdir = specdir
else:
destdir = sourcedir
os.rename(src, os.path.join(destdir, basename))


def wipe_git_dir(dirname):
for fname in os.listdir(dirname):
if fname == '.git':
Expand Down
104 changes: 104 additions & 0 deletions tests/test_explode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os

import rpm
import koji
import mock

from alt_src.alt_src import (
explode_srpm,
explode_srpm_cpio,
explode_srpm_install,
spec_from_headers,
CommandError,
)


TESTS_PATH = os.path.dirname(__file__)
RPMS_PATH = os.path.join(TESTS_PATH, "data", "rpms")


def test_explode_install(tmpdir):
"""explode_srpm_install correctly unpacks files in the typical case."""

srpm = os.path.join(RPMS_PATH, "fake-1.1-22.src.rpm")

# Simulate that we're exploding into an existing git checkout.
tmpdir.mkdir(".git")
tmpdir.join(".git").join("config").write("foobar")

# SRPM can be exploded without raising
explode_srpm_install(srpm, str(tmpdir))

# Destination directory contains expected files
output_files = []
for (dirpath, dirnames, filenames) in os.walk(str(tmpdir)):
dirpath = os.path.relpath(dirpath, str(tmpdir))
for filename in filenames:
output_files.append(os.path.join(dirpath, filename))

# It should extract exactly the expected files and should not touch unrelated files
assert sorted(output_files) == [
".git/config",
"SOURCES/foo.txt",
"SPECS/fake.spec",
]


def test_explode_cpio(tmpdir):
"""explode_srpm_cpio correctly unpacks files in the typical case."""

srpm = os.path.join(RPMS_PATH, "fake-1.1-22.src.rpm")

# Simulate that we're exploding into an existing git checkout.
tmpdir.mkdir(".git")
tmpdir.join(".git").join("config").write("foobar")

# SRPM can be exploded without raising
header = koji.get_rpm_header(srpm)
explode_srpm_cpio(srpm, header, str(tmpdir))

# Destination directory contains expected files
output_files = []
for (dirpath, dirnames, filenames) in os.walk(str(tmpdir)):
dirpath = os.path.relpath(dirpath, str(tmpdir))
for filename in filenames:
output_files.append(os.path.join(dirpath, filename))

# It should extract exactly the expected files and should not touch unrelated files
assert sorted(output_files) == [
".git/config",
"SOURCES/foo.txt",
"SPECS/fake.spec",
]


def test_explode_fallback():
"""explode_srpm tries "rpm -i" and falls back to rpm2cpio"""

srpm = os.path.join(RPMS_PATH, "fake-1.1-22.src.rpm")

with mock.patch('alt_src.alt_src.explode_srpm_install') as mock_install:
mock_install.side_effect = CommandError('oops, did not work')
with mock.patch('alt_src.alt_src.explode_srpm_cpio') as mock_cpio:
# It should run without raising
explode_srpm(srpm)

# It should have tried both methods
mock_install.assert_called_once()
mock_cpio.assert_called_once()


def test_unflagged_spec():
"""spec_from_headers falls back to filename heuristic in case of missing flags."""

headers = {
# mix of strs and bytes intentionally used here since rpm can produce both
rpm.RPMTAG_BASENAMES: ["somefile.patch", b"otherfile.spec", b"otherfile2.spec"],
rpm.RPMTAG_FILEFLAGS: [0, 0, 0],
}

found = spec_from_headers(headers)

# When no file was explicitly flagged as a spec file, it should use the first
# file whose name ended in .spec - exactly compatible with logic built in to rpm.
assert found == "otherfile.spec"

0 comments on commit c763f4e

Please sign in to comment.