Skip to content

Commit

Permalink
Added type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
radarhere committed Jul 12, 2024
1 parent 6a9acfa commit 5bae934
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 73 deletions.
2 changes: 1 addition & 1 deletion Tests/test_file_png.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

def chunk(cid: bytes, *data: bytes) -> bytes:
test_file = BytesIO()
PngImagePlugin.putchunk(*(test_file, cid) + data)
PngImagePlugin.putchunk(test_file, cid, *data)
return test_file.getvalue()


Expand Down
6 changes: 3 additions & 3 deletions Tests/test_imagefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def test_raise_oserror(self) -> None:
def test_raise_typeerror(self) -> None:
with pytest.raises(TypeError):
parser = ImageFile.Parser()
parser.feed(1)
parser.feed(1) # type: ignore[arg-type]

def test_negative_stride(self) -> None:
with open("Tests/images/raw_negative_stride.bin", "rb") as f:
Expand Down Expand Up @@ -305,7 +305,7 @@ def test_oversize(self) -> None:
im.load()

def test_decode(self) -> None:
decoder = ImageFile.PyDecoder(None)
decoder = ImageFile.PyDecoder("")
with pytest.raises(NotImplementedError):
decoder.decode(b"")

Expand Down Expand Up @@ -383,7 +383,7 @@ def test_oversize(self) -> None:
)

def test_encode(self) -> None:
encoder = ImageFile.PyEncoder(None)
encoder = ImageFile.PyEncoder("")
with pytest.raises(NotImplementedError):
encoder.encode(0)

Expand Down
34 changes: 17 additions & 17 deletions src/PIL/ImageFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def raise_oserror(error: int) -> OSError:
raise _get_oserror(error, encoder=False)


def _tilesort(t):
def _tilesort(t) -> int:
# sort on offset
return t[2]

Expand Down Expand Up @@ -161,7 +161,7 @@ def get_format_mimetype(self) -> str | None:
return Image.MIME.get(self.format.upper())
return None

def __setstate__(self, state):
def __setstate__(self, state) -> None:
self.tile = []
super().__setstate__(state)

Expand Down Expand Up @@ -333,14 +333,14 @@ def load_end(self) -> None:
# def load_read(self, read_bytes: int) -> bytes:
# pass

def _seek_check(self, frame):
def _seek_check(self, frame: int) -> bool:
if (
frame < self._min_frame
# Only check upper limit on frames if additional seek operations
# are not required to do so
or (
not (hasattr(self, "_n_frames") and self._n_frames is None)
and frame >= self.n_frames + self._min_frame
and frame >= getattr(self, "n_frames") + self._min_frame
)
):
msg = "attempt to seek outside sequence"
Expand Down Expand Up @@ -370,15 +370,15 @@ def _open(self) -> None:
msg = "StubImageFile subclass must implement _open"
raise NotImplementedError(msg)

def load(self):
def load(self) -> Image.core.PixelAccess | None:
loader = self._load()
if loader is None:
msg = f"cannot find loader for this {self.format} file"
raise OSError(msg)
image = loader.load(self)
assert image is not None
# become the other object (!)
self.__class__ = image.__class__
self.__class__ = image.__class__ # type: ignore[assignment]
self.__dict__ = image.__dict__
return image.load()

Expand All @@ -396,8 +396,8 @@ class Parser:

incremental = None
image: Image.Image | None = None
data = None
decoder = None
data: bytes | None = None
decoder: Image.core.ImagingDecoder | PyDecoder | None = None
offset = 0
finished = 0

Expand All @@ -409,7 +409,7 @@ def reset(self) -> None:
"""
assert self.data is None, "cannot reuse parsers"

def feed(self, data):
def feed(self, data: bytes) -> None:
"""
(Consumer) Feed data to the parser.
Expand Down Expand Up @@ -491,7 +491,7 @@ def __enter__(self) -> Parser:
def __exit__(self, *args: object) -> None:
self.close()

def close(self):
def close(self) -> Image.Image:
"""
(Consumer) Close the stream.
Expand Down Expand Up @@ -525,7 +525,7 @@ def close(self):
# --------------------------------------------------------------------


def _save(im, fp, tile, bufsize=0) -> None:
def _save(im, fp, tile, bufsize: int = 0) -> None:
"""Helper to save image based on tile list
:param im: Image object.
Expand Down Expand Up @@ -553,7 +553,7 @@ def _save(im, fp, tile, bufsize=0) -> None:
fp.flush()


def _encode_tile(im, fp, tile: list[_Tile], bufsize, fh, exc=None):
def _encode_tile(im, fp, tile: list[_Tile], bufsize: int, fh, exc=None) -> None:
for encoder_name, extents, offset, args in tile:
if offset > 0:
fp.seek(offset)
Expand Down Expand Up @@ -629,18 +629,18 @@ def extents(self) -> tuple[int, int, int, int]:
class PyCodec:
fd: IO[bytes] | None

def __init__(self, mode, *args):
self.im = None
def __init__(self, mode: str, *args: Any) -> None:
self.im: Image.core.ImagingCore | None = None
self.state = PyCodecState()
self.fd = None
self.mode = mode
self.init(args)

def init(self, args) -> None:
def init(self, args: tuple[Any, ...]) -> None:
"""
Override to perform codec specific initialization
:param args: Array of args items from the tile entry
:param args: Tuple of arg items from the tile entry
:returns: None
"""
self.args = args
Expand All @@ -662,7 +662,7 @@ def setfd(self, fd) -> None:
"""
self.fd = fd

def setimage(self, im, extents: tuple[int, int, int, int] | None = None) -> None:
def setimage(self, im, extents=None):
"""
Called from ImageFile to set the core output image for the codec
Expand Down
2 changes: 1 addition & 1 deletion src/PIL/ImageSequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class Iterator:
:param im: An image object.
"""

def __init__(self, im: Image.Image):
def __init__(self, im: Image.Image) -> None:
if not hasattr(im, "seek"):
msg = "im must have seek method"
raise AttributeError(msg)
Expand Down
4 changes: 2 additions & 2 deletions src/PIL/JpegImagePlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,11 +827,11 @@ def _save_cjpeg(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:

##
# Factory for making JPEG and MPO instances
def jpeg_factory(fp=None, filename=None):
def jpeg_factory(fp: IO[bytes] | None = None, filename: str | bytes | None = None):
im = JpegImageFile(fp, filename)
try:
mpheader = im._getmp()
if mpheader[45057] > 1:
if mpheader is not None and mpheader[45057] > 1:
for segment, content in im.applist:
if segment == "APP1" and b' hdrgm:Version="' in content:
# Ultra HDR images are not yet supported
Expand Down
20 changes: 10 additions & 10 deletions src/PIL/PdfImagePlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,15 @@ def _write_image(im, filename, existing_pdf, image_refs):
return image_ref, procset


def _save(im, fp, filename, save_all=False):
def _save(
im: Image.Image, fp: IO[bytes], filename: str | bytes, save_all: bool = False
) -> None:
is_appending = im.encoderinfo.get("append", False)
filename_str = filename.decode() if isinstance(filename, bytes) else filename
if is_appending:
existing_pdf = PdfParser.PdfParser(f=fp, filename=filename, mode="r+b")
existing_pdf = PdfParser.PdfParser(f=fp, filename=filename_str, mode="r+b")
else:
existing_pdf = PdfParser.PdfParser(f=fp, filename=filename, mode="w+b")
existing_pdf = PdfParser.PdfParser(f=fp, filename=filename_str, mode="w+b")

dpi = im.encoderinfo.get("dpi")
if dpi:
Expand Down Expand Up @@ -228,12 +231,7 @@ def _save(im, fp, filename, save_all=False):
for im in ims:
im_number_of_pages = 1
if save_all:
try:
im_number_of_pages = im.n_frames
except AttributeError:
# Image format does not have n_frames.
# It is a single frame image
pass
im_number_of_pages = getattr(im, "n_frames", 1)
number_of_pages += im_number_of_pages
for i in range(im_number_of_pages):
image_refs.append(existing_pdf.next_object_id(0))
Expand All @@ -250,7 +248,9 @@ def _save(im, fp, filename, save_all=False):

page_number = 0
for im_sequence in ims:
im_pages = ImageSequence.Iterator(im_sequence) if save_all else [im_sequence]
im_pages: ImageSequence.Iterator | list[Image.Image] = (
ImageSequence.Iterator(im_sequence) if save_all else [im_sequence]
)
for im in im_pages:
image_ref, procset = _write_image(im, filename, existing_pdf, image_refs)

Expand Down
36 changes: 19 additions & 17 deletions src/PIL/PngImagePlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def _safe_zlib_decompress(s):
return plaintext


def _crc32(data, seed=0):
def _crc32(data: bytes, seed: int = 0) -> int:
return zlib.crc32(data, seed) & 0xFFFFFFFF


Expand Down Expand Up @@ -191,7 +191,7 @@ def push(self, cid: bytes, pos: int, length: int) -> None:
assert self.queue is not None
self.queue.append((cid, pos, length))

def call(self, cid, pos, length):
def call(self, cid: bytes, pos: int, length: int) -> bytes:
"""Call the appropriate chunk handler"""

logger.debug("STREAM %r %s %s", cid, pos, length)
Expand Down Expand Up @@ -1091,21 +1091,21 @@ def getexif(self) -> Image.Exif:
}


def putchunk(fp, cid, *data):
def putchunk(fp: IO[bytes], cid: bytes, *data: bytes) -> None:
"""Write a PNG chunk (including CRC field)"""

data = b"".join(data)
byte_data = b"".join(data)

fp.write(o32(len(data)) + cid)
fp.write(data)
crc = _crc32(data, _crc32(cid))
fp.write(o32(len(byte_data)) + cid)
fp.write(byte_data)
crc = _crc32(byte_data, _crc32(cid))
fp.write(o32(crc))


class _idat:
# wrap output from the encoder in IDAT chunks

def __init__(self, fp, chunk):
def __init__(self, fp, chunk) -> None:
self.fp = fp
self.chunk = chunk

Expand All @@ -1116,7 +1116,7 @@ def write(self, data: bytes) -> None:
class _fdat:
# wrap encoder output in fdAT chunks

def __init__(self, fp, chunk, seq_num):
def __init__(self, fp: IO[bytes], chunk, seq_num: int) -> None:
self.fp = fp
self.chunk = chunk
self.seq_num = seq_num
Expand Down Expand Up @@ -1259,7 +1259,9 @@ def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
_save(im, fp, filename, save_all=True)


def _save(im, fp, filename, chunk=putchunk, save_all=False):
def _save(
im: Image.Image, fp, filename: str | bytes, chunk=putchunk, save_all: bool = False
) -> None:
# save an image to disk (called by the save method)

if save_all:
Expand Down Expand Up @@ -1461,7 +1463,7 @@ def _save(im, fp, filename, chunk=putchunk, save_all=False):
# PNG chunk converter


def getchunks(im, **params):
def getchunks(im: Image.Image, **params: Any) -> list[tuple[bytes, bytes, bytes]]:
"""Return a list of PNG chunks representing this image."""

class collector:
Expand All @@ -1470,19 +1472,19 @@ class collector:
def write(self, data: bytes) -> None:
pass

def append(self, chunk: bytes) -> None:
def append(self, chunk: tuple[bytes, bytes, bytes]) -> None:
self.data.append(chunk)

def append(fp, cid, *data):
data = b"".join(data)
crc = o32(_crc32(data, _crc32(cid)))
fp.append((cid, data, crc))
def append(fp: collector, cid: bytes, *data: bytes) -> None:
byte_data = b"".join(data)
crc = o32(_crc32(byte_data, _crc32(cid)))
fp.append((cid, byte_data, crc))

fp = collector()

try:
im.encoderinfo = params
_save(im, fp, None, append)
_save(im, fp, "", append)
finally:
del im.encoderinfo

Expand Down
Loading

0 comments on commit 5bae934

Please sign in to comment.