Skip to content

Commit

Permalink
type annotate twisted.test.test_lockfile
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert committed Sep 8, 2023
1 parent 8321b80 commit 6102c57
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 41 deletions.
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,6 @@ module = [
'twisted.test.test_formmethod',
'twisted.test.test_htb',
'twisted.test.test_iosim',
'twisted.test.test_lockfile',
'twisted.trial._asyncrunner',
'twisted.trial._dist.distreporter',
'twisted.trial._dist.disttrial',
Expand Down
84 changes: 44 additions & 40 deletions src/twisted/test/test_lockfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
"""
Tests for L{twisted.python.lockfile}.
"""

from __future__ import annotations

import errno
import os
from unittest import skipIf, skipUnless

from typing_extensions import NoReturn

from twisted.python import lockfile
from twisted.python.reflect import requireModule
from twisted.python.runtime import platform
Expand All @@ -35,7 +37,7 @@ class UtilTests(TestCase):
Tests for the helper functions used to implement L{FilesystemLock}.
"""

def test_symlinkEEXIST(self):
def test_symlinkEEXIST(self) -> None:
"""
L{lockfile.symlink} raises L{OSError} with C{errno} set to L{EEXIST}
when an attempt is made to create a symlink which already exists.
Expand All @@ -49,7 +51,7 @@ def test_symlinkEEXIST(self):
platform.isWindows(),
"special rename EIO handling only necessary and correct on " "Windows.",
)
def test_symlinkEIOWindows(self):
def test_symlinkEIOWindows(self) -> None:
"""
L{lockfile.symlink} raises L{OSError} with C{errno} set to L{EIO} when
the underlying L{rename} call fails with L{EIO}.
Expand All @@ -60,14 +62,14 @@ def test_symlinkEIOWindows(self):
"""
name = self.mktemp()

def fakeRename(src, dst):
def fakeRename(src: str, dst: str) -> NoReturn:
raise OSError(errno.EIO, None)

self.patch(lockfile, "rename", fakeRename)
exc = self.assertRaises(IOError, lockfile.symlink, name, "foo")
self.assertEqual(exc.errno, errno.EIO)

def test_readlinkENOENT(self):
def test_readlinkENOENT(self) -> None:
"""
L{lockfile.readlink} raises L{OSError} with C{errno} set to L{ENOENT}
when an attempt is made to read a symlink which does not exist.
Expand All @@ -80,7 +82,7 @@ def test_readlinkENOENT(self):
platform.isWindows(),
"special readlink EACCES handling only necessary and " "correct on Windows.",
)
def test_readlinkEACCESWindows(self):
def test_readlinkEACCESWindows(self) -> None:
"""
L{lockfile.readlink} raises L{OSError} with C{errno} set to L{EACCES}
on Windows when the underlying file open attempt fails with C{EACCES}.
Expand All @@ -91,23 +93,23 @@ def test_readlinkEACCESWindows(self):
"""
name = self.mktemp()

def fakeOpen(path, mode):
def fakeOpen(path: str, mode: str) -> NoReturn:
raise OSError(errno.EACCES, None)

self.patch(lockfile, "_open", fakeOpen)
exc = self.assertRaises(IOError, lockfile.readlink, name)
self.assertEqual(exc.errno, errno.EACCES)

@skipIf(skipKill, skipKillReason)
def test_kill(self):
def test_kill(self) -> None:
"""
L{lockfile.kill} returns without error if passed the PID of a
process which exists and signal C{0}.
"""
lockfile.kill(os.getpid(), 0)

@skipIf(skipKill, skipKillReason)
def test_killESRCH(self):
def test_killESRCH(self) -> None:
"""
L{lockfile.kill} raises L{OSError} with errno of L{ESRCH} if
passed a PID which does not correspond to any process.
Expand All @@ -116,7 +118,7 @@ def test_killESRCH(self):
exc = self.assertRaises(OSError, lockfile.kill, 2 ** 31 - 1, 0)
self.assertEqual(exc.errno, errno.ESRCH)

def test_noKillCall(self):
def test_noKillCall(self) -> None:
"""
Verify that when L{lockfile.kill} does end up as None (e.g. on Windows
without pywin32), it doesn't end up being called and raising a
Expand All @@ -129,8 +131,8 @@ def test_noKillCall(self):


class LockingTests(TestCase):
def _symlinkErrorTest(self, errno):
def fakeSymlink(source, dest):
def _symlinkErrorTest(self, errno: int) -> None:
def fakeSymlink(source: str, dest: str) -> NoReturn:
raise OSError(errno, None)

self.patch(lockfile, "symlink", fakeSymlink)
Expand All @@ -140,7 +142,7 @@ def fakeSymlink(source, dest):
exc = self.assertRaises(OSError, lock.lock)
self.assertEqual(exc.errno, errno)

def test_symlinkError(self):
def test_symlinkError(self) -> None:
"""
An exception raised by C{symlink} other than C{EEXIST} is passed up to
the caller of L{FilesystemLock.lock}.
Expand All @@ -151,7 +153,7 @@ def test_symlinkError(self):
platform.isWindows(),
"POSIX-specific error propagation not expected on Windows.",
)
def test_symlinkErrorPOSIX(self):
def test_symlinkErrorPOSIX(self) -> None:
"""
An L{OSError} raised by C{symlink} on a POSIX platform with an errno of
C{EACCES} or C{EIO} is passed to the caller of L{FilesystemLock.lock}.
Expand All @@ -162,7 +164,7 @@ def test_symlinkErrorPOSIX(self):
self._symlinkErrorTest(errno.EACCES)
self._symlinkErrorTest(errno.EIO)

def test_cleanlyAcquire(self):
def test_cleanlyAcquire(self) -> None:
"""
If the lock has never been held, it can be acquired and the C{clean}
and C{locked} attributes are set to C{True}.
Expand All @@ -173,7 +175,7 @@ def test_cleanlyAcquire(self):
self.assertTrue(lock.clean)
self.assertTrue(lock.locked)

def test_cleanlyRelease(self):
def test_cleanlyRelease(self) -> None:
"""
If a lock is released cleanly, it can be re-acquired and the C{clean}
and C{locked} attributes are set to C{True}.
Expand All @@ -189,7 +191,7 @@ def test_cleanlyRelease(self):
self.assertTrue(lock.clean)
self.assertTrue(lock.locked)

def test_cannotLockLocked(self):
def test_cannotLockLocked(self) -> None:
"""
If a lock is currently locked, it cannot be locked again.
"""
Expand All @@ -201,15 +203,15 @@ def test_cannotLockLocked(self):
self.assertFalse(secondLock.lock())
self.assertFalse(secondLock.locked)

def test_uncleanlyAcquire(self):
def test_uncleanlyAcquire(self) -> None:
"""
If a lock was held by a process which no longer exists, it can be
acquired, the C{clean} attribute is set to C{False}, and the
C{locked} attribute is set to C{True}.
"""
owner = 12345

def fakeKill(pid, signal):
def fakeKill(pid: int, signal: int) -> None:
if signal != 0:
raise OSError(errno.EPERM, None)
if pid == owner:
Expand All @@ -226,14 +228,14 @@ def fakeKill(pid, signal):

self.assertEqual(lockfile.readlink(lockf), str(os.getpid()))

def test_lockReleasedBeforeCheck(self):
def test_lockReleasedBeforeCheck(self) -> None:
"""
If the lock is initially held but then released before it can be
examined to determine if the process which held it still exists, it is
acquired and the C{clean} and C{locked} attributes are set to C{True}.
"""

def fakeReadlink(name):
def fakeReadlink(name: str) -> str:
# Pretend to be another process releasing the lock.
lockfile.rmlink(lockf)
# Fall back to the real implementation of readlink.
Expand All @@ -242,7 +244,7 @@ def fakeReadlink(name):

readlinkPatch = self.patch(lockfile, "readlink", fakeReadlink)

def fakeKill(pid, signal):
def fakeKill(pid: int, signal: int) -> None:
if signal != 0:
raise OSError(errno.EPERM, None)
if pid == 43125:
Expand All @@ -261,7 +263,7 @@ def fakeKill(pid, signal):
platform.isWindows(),
"special rename EIO handling only necessary and correct on " "Windows.",
)
def test_lockReleasedDuringAcquireSymlink(self):
def test_lockReleasedDuringAcquireSymlink(self) -> None:
"""
If the lock is released while an attempt is made to acquire
it, the lock attempt fails and C{FilesystemLock.lock} returns
Expand All @@ -271,7 +273,7 @@ def test_lockReleasedDuringAcquireSymlink(self):
RemoveDirectory) which is not atomic.
"""

def fakeSymlink(src, dst):
def fakeSymlink(src: str, dst: str) -> NoReturn:
# While another process id doing os.rmdir which the Windows
# implementation of rmlink does, a rename call will fail with EIO.
raise OSError(errno.EIO, None)
Expand All @@ -287,14 +289,14 @@ def fakeSymlink(src, dst):
platform.isWindows(),
"special readlink EACCES handling only necessary and " "correct on Windows.",
)
def test_lockReleasedDuringAcquireReadlink(self):
def test_lockReleasedDuringAcquireReadlink(self) -> None:
"""
If the lock is initially held but is released while an attempt
is made to acquire it, the lock attempt fails and
L{FilesystemLock.lock} returns C{False}.
"""

def fakeReadlink(name):
def fakeReadlink(name: str) -> NoReturn:
# While another process is doing os.rmdir which the
# Windows implementation of rmlink does, a readlink call
# will fail with EACCES.
Expand All @@ -308,8 +310,10 @@ def fakeReadlink(name):
self.assertFalse(lock.lock())
self.assertFalse(lock.locked)

def _readlinkErrorTest(self, exceptionType, errno):
def fakeReadlink(name):
def _readlinkErrorTest(
self, exceptionType: type[OSError] | type[IOError], errno: int
) -> None:
def fakeReadlink(name: str) -> NoReturn:
raise exceptionType(errno, None)

self.patch(lockfile, "readlink", fakeReadlink)
Expand All @@ -324,7 +328,7 @@ def fakeReadlink(name):
self.assertEqual(exc.errno, errno)
self.assertFalse(lock.locked)

def test_readlinkError(self):
def test_readlinkError(self) -> None:
"""
An exception raised by C{readlink} other than C{ENOENT} is passed up to
the caller of L{FilesystemLock.lock}.
Expand All @@ -336,7 +340,7 @@ def test_readlinkError(self):
platform.isWindows(),
"POSIX-specific error propagation not expected on Windows.",
)
def test_readlinkErrorPOSIX(self):
def test_readlinkErrorPOSIX(self) -> None:
"""
Any L{IOError} raised by C{readlink} on a POSIX platform passed to the
caller of L{FilesystemLock.lock}.
Expand All @@ -347,14 +351,14 @@ def test_readlinkErrorPOSIX(self):
self._readlinkErrorTest(IOError, errno.ENOSYS)
self._readlinkErrorTest(IOError, errno.EACCES)

def test_lockCleanedUpConcurrently(self):
def test_lockCleanedUpConcurrently(self) -> None:
"""
If a second process cleans up the lock after a first one checks the
lock and finds that no process is holding it, the first process does
not fail when it tries to clean up the lock.
"""

def fakeRmlink(name):
def fakeRmlink(name: str) -> None:
rmlinkPatch.restore()
# Pretend to be another process cleaning up the lock.
lockfile.rmlink(lockf)
Expand All @@ -363,7 +367,7 @@ def fakeRmlink(name):

rmlinkPatch = self.patch(lockfile, "rmlink", fakeRmlink)

def fakeKill(pid, signal):
def fakeKill(pid: int, signal: int) -> None:
if signal != 0:
raise OSError(errno.EPERM, None)
if pid == 43125:
Expand All @@ -378,18 +382,18 @@ def fakeKill(pid, signal):
self.assertTrue(lock.clean)
self.assertTrue(lock.locked)

def test_rmlinkError(self):
def test_rmlinkError(self) -> None:
"""
An exception raised by L{rmlink} other than C{ENOENT} is passed up
to the caller of L{FilesystemLock.lock}.
"""

def fakeRmlink(name):
def fakeRmlink(name: str) -> NoReturn:
raise OSError(errno.ENOSYS, None)

self.patch(lockfile, "rmlink", fakeRmlink)

def fakeKill(pid, signal):
def fakeKill(pid: int, signal: int) -> None:
if signal != 0:
raise OSError(errno.EPERM, None)
if pid == 43125:
Expand All @@ -407,14 +411,14 @@ def fakeKill(pid, signal):
self.assertEqual(exc.errno, errno.ENOSYS)
self.assertFalse(lock.locked)

def test_killError(self):
def test_killError(self) -> None:
"""
If L{kill} raises an exception other than L{OSError} with errno set to
C{ESRCH}, the exception is passed up to the caller of
L{FilesystemLock.lock}.
"""

def fakeKill(pid, signal):
def fakeKill(pid: int, signal: int) -> NoReturn:
raise OSError(errno.EPERM, None)

self.patch(lockfile, "kill", fakeKill)
Expand All @@ -429,7 +433,7 @@ def fakeKill(pid, signal):
self.assertEqual(exc.errno, errno.EPERM)
self.assertFalse(lock.locked)

def test_unlockOther(self):
def test_unlockOther(self) -> None:
"""
L{FilesystemLock.unlock} raises L{ValueError} if called for a lock
which is held by a different process.
Expand All @@ -439,7 +443,7 @@ def test_unlockOther(self):
lock = lockfile.FilesystemLock(lockf)
self.assertRaises(ValueError, lock.unlock)

def test_isLocked(self):
def test_isLocked(self) -> None:
"""
L{isLocked} returns C{True} if the named lock is currently locked,
C{False} otherwise.
Expand Down

0 comments on commit 6102c57

Please sign in to comment.