Skip to content

Commit

Permalink
gh-114959: tarfile: do not ignore errors when extract a directory on …
Browse files Browse the repository at this point in the history
…top of a file (GH-114960)

Also, add tests common to tarfile and zipfile.
  • Loading branch information
serhiy-storchaka committed Feb 3, 2024
1 parent b4240fd commit 96bce03
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 1 deletion.
3 changes: 2 additions & 1 deletion Lib/tarfile.py
Expand Up @@ -2456,7 +2456,8 @@ def makedir(self, tarinfo, targetpath):
# later in _extract_member().
os.mkdir(targetpath, 0o700)
except FileExistsError:
pass
if not os.path.isdir(targetpath):
raise

def makefile(self, tarinfo, targetpath):
"""Make a file called targetpath.
Expand Down
155 changes: 155 additions & 0 deletions Lib/test/archiver_tests.py
@@ -0,0 +1,155 @@
"""Tests common to tarfile and zipfile."""

import os
import sys

from test.support import os_helper

class OverwriteTests:

def setUp(self):
os.makedirs(self.testdir)
self.addCleanup(os_helper.rmtree, self.testdir)

def create_file(self, path, content=b''):
with open(path, 'wb') as f:
f.write(content)

def open(self, path):
raise NotImplementedError

def extractall(self, ar):
raise NotImplementedError


def test_overwrite_file_as_file(self):
target = os.path.join(self.testdir, 'test')
self.create_file(target, b'content')
with self.open(self.ar_with_file) as ar:
self.extractall(ar)
self.assertTrue(os.path.isfile(target))
with open(target, 'rb') as f:
self.assertEqual(f.read(), b'newcontent')

def test_overwrite_dir_as_dir(self):
target = os.path.join(self.testdir, 'test')
os.mkdir(target)
with self.open(self.ar_with_dir) as ar:
self.extractall(ar)
self.assertTrue(os.path.isdir(target))

def test_overwrite_dir_as_implicit_dir(self):
target = os.path.join(self.testdir, 'test')
os.mkdir(target)
with self.open(self.ar_with_implicit_dir) as ar:
self.extractall(ar)
self.assertTrue(os.path.isdir(target))
self.assertTrue(os.path.isfile(os.path.join(target, 'file')))
with open(os.path.join(target, 'file'), 'rb') as f:
self.assertEqual(f.read(), b'newcontent')

def test_overwrite_dir_as_file(self):
target = os.path.join(self.testdir, 'test')
os.mkdir(target)
with self.open(self.ar_with_file) as ar:
with self.assertRaises(PermissionError if sys.platform == 'win32'
else IsADirectoryError):
self.extractall(ar)
self.assertTrue(os.path.isdir(target))

def test_overwrite_file_as_dir(self):
target = os.path.join(self.testdir, 'test')
self.create_file(target, b'content')
with self.open(self.ar_with_dir) as ar:
with self.assertRaises(FileExistsError):
self.extractall(ar)
self.assertTrue(os.path.isfile(target))
with open(target, 'rb') as f:
self.assertEqual(f.read(), b'content')

def test_overwrite_file_as_implicit_dir(self):
target = os.path.join(self.testdir, 'test')
self.create_file(target, b'content')
with self.open(self.ar_with_implicit_dir) as ar:
with self.assertRaises(FileNotFoundError if sys.platform == 'win32'
else NotADirectoryError):
self.extractall(ar)
self.assertTrue(os.path.isfile(target))
with open(target, 'rb') as f:
self.assertEqual(f.read(), b'content')

@os_helper.skip_unless_symlink
def test_overwrite_file_symlink_as_file(self):
# XXX: It is potential security vulnerability.
target = os.path.join(self.testdir, 'test')
target2 = os.path.join(self.testdir, 'test2')
self.create_file(target2, b'content')
os.symlink('test2', target)
with self.open(self.ar_with_file) as ar:
self.extractall(ar)
self.assertTrue(os.path.islink(target))
self.assertTrue(os.path.isfile(target2))
with open(target2, 'rb') as f:
self.assertEqual(f.read(), b'newcontent')

@os_helper.skip_unless_symlink
def test_overwrite_broken_file_symlink_as_file(self):
# XXX: It is potential security vulnerability.
target = os.path.join(self.testdir, 'test')
target2 = os.path.join(self.testdir, 'test2')
os.symlink('test2', target)
with self.open(self.ar_with_file) as ar:
self.extractall(ar)
self.assertTrue(os.path.islink(target))
self.assertTrue(os.path.isfile(target2))
with open(target2, 'rb') as f:
self.assertEqual(f.read(), b'newcontent')

@os_helper.skip_unless_symlink
def test_overwrite_dir_symlink_as_dir(self):
# XXX: It is potential security vulnerability.
target = os.path.join(self.testdir, 'test')
target2 = os.path.join(self.testdir, 'test2')
os.mkdir(target2)
os.symlink('test2', target, target_is_directory=True)
with self.open(self.ar_with_dir) as ar:
self.extractall(ar)
self.assertTrue(os.path.islink(target))
self.assertTrue(os.path.isdir(target2))

@os_helper.skip_unless_symlink
def test_overwrite_dir_symlink_as_implicit_dir(self):
# XXX: It is potential security vulnerability.
target = os.path.join(self.testdir, 'test')
target2 = os.path.join(self.testdir, 'test2')
os.mkdir(target2)
os.symlink('test2', target, target_is_directory=True)
with self.open(self.ar_with_implicit_dir) as ar:
self.extractall(ar)
self.assertTrue(os.path.islink(target))
self.assertTrue(os.path.isdir(target2))
self.assertTrue(os.path.isfile(os.path.join(target2, 'file')))
with open(os.path.join(target2, 'file'), 'rb') as f:
self.assertEqual(f.read(), b'newcontent')

@os_helper.skip_unless_symlink
def test_overwrite_broken_dir_symlink_as_dir(self):
target = os.path.join(self.testdir, 'test')
target2 = os.path.join(self.testdir, 'test2')
os.symlink('test2', target, target_is_directory=True)
with self.open(self.ar_with_dir) as ar:
with self.assertRaises(FileExistsError):
self.extractall(ar)
self.assertTrue(os.path.islink(target))
self.assertFalse(os.path.exists(target2))

@os_helper.skip_unless_symlink
def test_overwrite_broken_dir_symlink_as_implicit_dir(self):
target = os.path.join(self.testdir, 'test')
target2 = os.path.join(self.testdir, 'test2')
os.symlink('test2', target, target_is_directory=True)
with self.open(self.ar_with_implicit_dir) as ar:
with self.assertRaises(FileExistsError):
self.extractall(ar)
self.assertTrue(os.path.islink(target))
self.assertFalse(os.path.exists(target2))
33 changes: 33 additions & 0 deletions Lib/test/test_tarfile.py
Expand Up @@ -15,6 +15,7 @@
import unittest.mock
import tarfile

from test import archiver_tests
from test import support
from test.support import os_helper
from test.support import script_helper
Expand Down Expand Up @@ -4135,6 +4136,38 @@ def valueerror_filter(tarinfo, path):
self.expect_exception(TypeError) # errorlevel is not int


class OverwriteTests(archiver_tests.OverwriteTests, unittest.TestCase):
testdir = os.path.join(TEMPDIR, "testoverwrite")

@classmethod
def setUpClass(cls):
p = cls.ar_with_file = os.path.join(TEMPDIR, 'tar-with-file.tar')
cls.addClassCleanup(os_helper.unlink, p)
with tarfile.open(p, 'w') as tar:
t = tarfile.TarInfo('test')
t.size = 10
tar.addfile(t, io.BytesIO(b'newcontent'))

p = cls.ar_with_dir = os.path.join(TEMPDIR, 'tar-with-dir.tar')
cls.addClassCleanup(os_helper.unlink, p)
with tarfile.open(p, 'w') as tar:
tar.addfile(tar.gettarinfo(os.curdir, 'test'))

p = os.path.join(TEMPDIR, 'tar-with-implicit-dir.tar')
cls.ar_with_implicit_dir = p
cls.addClassCleanup(os_helper.unlink, p)
with tarfile.open(p, 'w') as tar:
t = tarfile.TarInfo('test/file')
t.size = 10
tar.addfile(t, io.BytesIO(b'newcontent'))

def open(self, path):
return tarfile.open(path, 'r')

def extractall(self, ar):
ar.extractall(self.testdir, filter='fully_trusted')


def setUpModule():
os_helper.unlink(TEMPDIR)
os.makedirs(TEMPDIR)
Expand Down
28 changes: 28 additions & 0 deletions Lib/test/test_zipfile/test_core.py
Expand Up @@ -18,6 +18,7 @@
from tempfile import TemporaryFile
from random import randint, random, randbytes

from test import archiver_tests
from test.support import script_helper
from test.support import (
findfile, requires_zlib, requires_bz2, requires_lzma,
Expand Down Expand Up @@ -1687,6 +1688,33 @@ def _test_extract_hackers_arcnames(self, hacknames):
unlink(TESTFN2)


class OverwriteTests(archiver_tests.OverwriteTests, unittest.TestCase):
testdir = TESTFN

@classmethod
def setUpClass(cls):
p = cls.ar_with_file = TESTFN + '-with-file.zip'
cls.addClassCleanup(unlink, p)
with zipfile.ZipFile(p, 'w') as zipfp:
zipfp.writestr('test', b'newcontent')

p = cls.ar_with_dir = TESTFN + '-with-dir.zip'
cls.addClassCleanup(unlink, p)
with zipfile.ZipFile(p, 'w') as zipfp:
zipfp.mkdir('test')

p = cls.ar_with_implicit_dir = TESTFN + '-with-implicit-dir.zip'
cls.addClassCleanup(unlink, p)
with zipfile.ZipFile(p, 'w') as zipfp:
zipfp.writestr('test/file', b'newcontent')

def open(self, path):
return zipfile.ZipFile(path, 'r')

def extractall(self, ar):
ar.extractall(self.testdir)


class OtherTests(unittest.TestCase):
def test_open_via_zip_info(self):
# Create the ZIP archive
Expand Down
@@ -0,0 +1,2 @@
:mod:`tarfile` no longer ignores errors when trying to extract a directory on
top of a file.

0 comments on commit 96bce03

Please sign in to comment.