# PNG Parser in pure python

A monolithic, a bit bloated, questionable implementations, and non performant png data structure parser built in python with minimal external library dependencies.

References:
- [libpng file specification](http://www.libpng.org/pub/png/spec/1.2/PNG-Structure.html)
- [w3c file specification](https://www.w3.org/TR/2003/REC-PNG-20031110/) 
- [ExifTools - other non-standard png chunk type tags reference](https://exiftool.org/TagNames/PNG.html)

## Imports

In [1]:
from re import compile as re_compile
from pathlib import Path
from dataclasses import dataclass, field
from io import BytesIO
import struct
import os
import time

## Constants

### png and parser definitions

In [2]:
PNG_SIGNATURE = bytearray([137, 80, 78, 71, 13, 10, 26, 10])
NULL_SEP = b'\0' # or b'\x00'

# list of sRGB chunk intent values
LS_PNG_CT_sRGB_INTENT_V = [
    'Perceptual',
    'Relative colorimetric',
    'Saturation',
    'Absolute colorimetric'
]

### Optional options

In [3]:
VERBOSE = True

SAVE_PRINT_STDOUT = True
SAVE_PRINT_PRUNE = True
SAVE_PRINT_FNAME = 'stdout.txt'

### Tests definitions

*keep in mind* this is not unit test, it just a generic test to see the ability to parse with suite of png test file. no errors will be thrown.


#### > Schaik Standard Test Suite

Test used are suites provided by Schaik at [schaik.com](http://www.schaik.com/pngsuite/) 
/ [suite webpage](http://www.libpng.org/pub/png/pngsuite.html) 
/ [license](http://www.schaik.com/pngsuite/PngSuite.LICENSE)

note: if STD_TEST_INPUT_TEST is false, then this program will try to fetch any png files from the jupyter file root directory

In [4]:
STD_TEST_INPUT_TEST = True
STD_TEST_INPUT_FOLDER = 'test-input'
STD_TEST_INPUT_CURL = 'http://www.schaik.com/pngsuite/PngSuite-2017jul19.zip' 
STD_TEST_INPUT_SCHAIK_EXCERPT = 'libpng-schaik-excerpt.txt'
STD_TEST_INPUT_SCHAIK_EXCERPT_CURL = 'https://raw.githubusercontent.com/previoip/jupyter-png-parser/main/libpng-schaik-excerpt.txt'

#### > Flush to File test

In [5]:
TEST_FLUSH_TO_FS = True
TEST_FLUSH_FOLDER = 'export'
TEST_REPACK = True

## Functions and Wrrrapers

In [6]:
# unpack big endian bytes into signed or unsigned integer 
# rtype: int
def _p_uint(b):
  return int.from_bytes(b, byteorder='big', signed=False)

def _p_int(b):
  return int.from_bytes(b, byteorder='big', signed=True)

In [7]:
# checks whether file object has reached end of file/line. I'll admit this is questionable at best.
# rtype: bool
def at_eof(fo):
    # lmao
    c = fo.read(1)
    fo.seek(-1, 1)
    if not c:
        return True
    return False

# splice bytes until first instance of null bytes is found. Returns left side of bytes and remainder bytes
# rtype: Tuple[bytes, bytes]
def splice_null_sep(b: bytes):
    i = b.find(NULL_SEP)
    if i == -1:
        return b, b''
    return b[:i], b[i+1:]

In [8]:
# checks whether filepath leads to file with png extension
# rtype: bool

re_png_ext = re_compile(r'.*(?:\.(?:p|P)(?:n|N)(?:g|G))(?:\n|\Z)')

def is_path_png(path):
    r = re_png_ext.search(path)
    if r:
        return True
    return False


In [9]:
# print function alternative with the ability to write to a file and print to standard output. Questionable and bad overall since fileio is slow.

suppress_print_w = False

def print_w(*args, fp=SAVE_PRINT_FNAME, **kwargs):
    # lmfao
    if SAVE_PRINT_STDOUT:
        with open(fp, 'a') as fo:
            print(*args, file=fo, **kwargs)
    if not suppress_print_w:
        print(*args, **kwargs)

In [10]:
# get python information and program run timestamp
def get_python_stat():
    import sys
    import datetime
    r = ''
    r += 'python version : ' + sys.version + '\n'
    r += 'prog timestamp : ' + datetime.datetime.utcnow().isoformat() + '\n'
    return r

In [11]:
# if test with Schaik suite is True, then the excerpt is used to get the test file description

ls_png_test_fname_excerpt = {}

def load_png_test_fname_preset():
    if not STD_TEST_INPUT_SCHAIK_EXCERPT:
        return
    if not os.path.exists(STD_TEST_INPUT_SCHAIK_EXCERPT):
        return
    with open(STD_TEST_INPUT_SCHAIK_EXCERPT, 'r') as fo:
        for line in fo.readlines():
            if line.startswith('        '):
                r = [i.strip() for i in line.split('-')]
                if len(r) != 2:
                    continue
                kw, v = r
                ls_png_test_fname_excerpt[kw] = v

def parse_png_test_fname_preset(path: Path):
    if not STD_TEST_INPUT_TEST:
        return ''

    if not ls_png_test_fname_excerpt:
        load_png_test_fname_preset()

    if isinstance(path, Path):
        name = path.name
    elif isinstance(path, str):
        name = os.path.split(path)[-1]
    name = name.split('.')[0]

    r = ls_png_test_fname_excerpt.get(name)
    if not r:
        return 'not documented'
    return r


In [12]:
# crc implementation http://www.libpng.org/pub/png/spec/1.2/PNG-CRCAppendix.html

crc_table = [None for _ in range(256)]
is_crc_table_computed = False

def make_crc_table():
    for n in range(256):
        c = n
        for _ in range(8):
            if c & 1:
                c = 0xedb88320 ^ (c >> 1)
            else:
                c = c >> 1
        crc_table[n] = c
    is_crc_table_computed = True


def update_crc_table(crc_cksum, msg):
    c = crc_cksum

    if not is_crc_table_computed:
        make_crc_table()
    
    for b in msg:
        c = crc_table[(c ^ b) & 0xff] ^ (c >> 8)
    return c

def crc(msg: bytes):
    return update_crc_table(0xffffffff, msg) ^ 0xffffffff

def crc_to_bytes(msg: bytes):
    return struct.pack('>L', crc(msg))


# or alternatively (instead of all of these nonsense) use zlib crc instead
#
# def crc(msg: bytes):
#     from zlib import crc32
#     return crc32(msg, 0)
#
# or other sophisticated wrapper such as the ones discussed in this thread
# https://stackoverflow.com/questions/1742866/compute-crc-of-file-in-python

## Main Program, Chunk Dataclass, and Parser Class

### Inits

In [13]:
# if SAVE_PRINT is defined

# 1. delete txt file if exist
if SAVE_PRINT_PRUNE:
    if os.path.exists(SAVE_PRINT_FNAME) and os.path.isfile(SAVE_PRINT_FNAME):
        os.remove(SAVE_PRINT_FNAME)

# 2. re-instantiate file with stat on the head of the txt file
if SAVE_PRINT_STDOUT and not os.path.exists(SAVE_PRINT_FNAME):
    with open(SAVE_PRINT_FNAME, 'w') as fo:
        fo.write(get_python_stat())
        fo.write('\n\n')


# get current working directory
cwd = Path('.')


# if TEST_FLUSH is defined: create the output folder as current jupyter directory
if TEST_FLUSH_TO_FS:
  tp = cwd / TEST_FLUSH_FOLDER
  if not tp.exists():
    tp.mkdir()

# if STD_TEST_INPUT is defined
if STD_TEST_INPUT_TEST:

    # 1. change cwd to test input folder and create folder if does not exist
    cwd /= STD_TEST_INPUT_FOLDER
    if not cwd.exists(): cwd.mkdir()

    # 2. if no png files present at the folder, fetch zip file from the web
    if not list(cwd.glob('*.png')):
        import requests
        import zipfile

        r = requests.get(STD_TEST_INPUT_CURL)
        r.raise_for_status()
        buf = BytesIO(r.content)
        with zipfile.ZipFile(buf) as zf:
            zf.extractall(STD_TEST_INPUT_FOLDER)

target_files = list(cwd.glob('*.png'))
target_files[:10], target_files[-10:]

([WindowsPath('test-input/basi0g01.png'),
  WindowsPath('test-input/basi0g02.png'),
  WindowsPath('test-input/basi0g04.png'),
  WindowsPath('test-input/basi0g08.png'),
  WindowsPath('test-input/basi0g16.png'),
  WindowsPath('test-input/basi2c08.png'),
  WindowsPath('test-input/basi2c16.png'),
  WindowsPath('test-input/basi3p01.png'),
  WindowsPath('test-input/basi3p02.png'),
  WindowsPath('test-input/basi3p04.png')],
 [WindowsPath('test-input/xhdn0g08.png'),
  WindowsPath('test-input/xlfn0g04.png'),
  WindowsPath('test-input/xs1n0g01.png'),
  WindowsPath('test-input/xs2n0g01.png'),
  WindowsPath('test-input/xs4n0g01.png'),
  WindowsPath('test-input/xs7n0g01.png'),
  WindowsPath('test-input/z00n2c08.png'),
  WindowsPath('test-input/z03n2c08.png'),
  WindowsPath('test-input/z06n2c08.png'),
  WindowsPath('test-input/z09n2c08.png')])

### Chunk Type Tag Enum

In [14]:
class _BaseEnum: pass

def enum_fetch_all(enum: _BaseEnum):
  _reserved = list(dir(object)) +  ['__dict__', '__module__', '__weakref__']
  return [getattr(enum, i) for i in dir(enum) if i not in _reserved]

class ENUM_PNG_CT_BYTES(_BaseEnum):

    # Critical chunk types 
    IHDR = b'IHDR'
    PLTE = b'PLTE'
    IDAT = b'IDAT'
    IEND = b'IEND'

    # Ancillary chunk types
    cHRM = b'cHRM'    # Before PLTE and IDAT
    gAMA = b'gAMA'    # Before PLTE and IDAT
    iCCP = b'iCCP'    # Before PLTE and IDAT
    sBIT = b'sBIT'    # Before PLTE and IDAT
    sRGB = b'sRGB'    # Before PLTE and IDAT
    bKGD = b'bKGD'    # After PLTE; before IDAT
    hIST = b'hIST'    # After PLTE; before IDAT
    tRNS = b'tRNS'    # After PLTE; before IDAT
    pHYs = b'pHYs'    # Before IDAT
    sPLT = b'sPLT'    # Before IDAT
    tIME = b'tIME'
    iTXt = b'iTXt'
    tEXt = b'tEXt'
    zTXt = b'zTXt'

In [15]:
enum_fetch_all(ENUM_PNG_CT_BYTES)

[b'IDAT',
 b'IEND',
 b'IHDR',
 b'PLTE',
 b'bKGD',
 b'cHRM',
 b'gAMA',
 b'hIST',
 b'iCCP',
 b'iTXt',
 b'pHYs',
 b'sBIT',
 b'sPLT',
 b'sRGB',
 b'tEXt',
 b'tIME',
 b'tRNS',
 b'zTXt']

### Dataclasses

In [16]:
class PNGImage: pass

class I_ChunkTemplate: pass

class CT_IHDR(I_ChunkTemplate): pass
class CT_PLTE(I_ChunkTemplate): pass
class CT_PLTE(I_ChunkTemplate): pass
class CT_IDAT(I_ChunkTemplate): pass
class CT_IEND(I_ChunkTemplate): pass

class CT_cHRM(I_ChunkTemplate): pass
class CT_gAMA(I_ChunkTemplate): pass
class CT_iCCP(I_ChunkTemplate): pass
class CT_sBIT(I_ChunkTemplate): pass
class CT_sRGB(I_ChunkTemplate): pass
class CT_bKGD(I_ChunkTemplate): pass
class CT_hIST(I_ChunkTemplate): pass
class CT_tRNS(I_ChunkTemplate): pass
class CT_pHYs(I_ChunkTemplate): pass
class CT_sPLT(I_ChunkTemplate): pass
class CT_tIME(I_ChunkTemplate): pass
class CT_iTXt(I_ChunkTemplate): pass
class CT_tEXt(I_ChunkTemplate): pass
class CT_zTXt(I_ChunkTemplate): pass

class C_RGB: pass
class C_RGBA: pass

In [17]:
# color dataclass

@dataclass
class C_RGB:
    r: int = 0
    g: int = 0
    b: int = 0

@dataclass
class C_RGBA(C_RGB):
    a: int = 0xff


# random structs for certain chunk tags idk

@dataclass
class E_sBIT(C_RGBA): pass

@dataclass
class E_sPLT:
    color_space  : C_RGBA = C_RGBA(None, None, None, None)
    frequency    : int = None

### Tags Implementations (the meat of this notebook)

In [18]:
# chunk type template (interface) with some custom method

@dataclass(repr=False)
class I_ChunkTemplate:
    _png_instance   : PNGImage  = None
    is_parsed       : bool      = False
    chunk_size      : int       = 0
    chunk_type      : bytes     = b''
    chunk_data      : BytesIO   = None
    chunk_crc       : bytes     = b''


    def set_pngImageInstance(self, instance):
        self._png_instance = instance

    def _parser(self):
        print_w(f'WARNING! chunk type {self.chunk_type} parser method is not yet overridden. skipping procedure.')
        return 1

    def _test(self):
        # print_w(f'chunk {self.chunk_type} test method is not yet overridden.')
        return

    def parse(self):
        if self.is_parsed:
            return 0
        if self._parser():
            return 1

        try:
            self._test()
        except AssertionError as e:
            print_w('WARNING! AssertionError:', e)
            return 1

        self.is_parsed = True

    def _pack(self):
        return f'WARNING! chunk type {self.chunk_type} pack method is not yet overridden. skipping procedure.'

    def repack(self):
        self.parse()
        temp_chunk_data = self.chunk_data
        self.chunk_data = BytesIO()
    

        if self.chunk_type == ENUM_PNG_CT_BYTES.IDAT:
            self.chunk_data = temp_chunk_data
            return

        try:
            errmsg = self._pack()
            if errmsg:
                raise RuntimeError(errmsg)
        except Exception as e:
            print_w('WARNING! Exception Raised:', e)
            self.chunk_data = temp_chunk_data
            return 1

        self.chunk_data.seek(0)

    def __repr__(self):
        i = [(k, v) for k, v in self.__dict__.items() if not k.startswith('_') and not k.endswith('data')]
        j = [(k, f"'{v}'") if isinstance(v, str) else (k, v) for k, v in i]
        l = [f'{k}={v}' for k, v in j]
        return '%s(%s)' % (self.__class__.__name__, ', '.join(l))

In [19]:
@dataclass(repr=False)
class CT_IHDR(I_ChunkTemplate):
    width               : int = None
    height              : int = None
    bit_depth           : int = None
    color_type          : int = None
    compression_method  : int = None
    filter_method       : int = None
    interlace_method    : int = None
    color_type_info     : str = ''

    def _parser(self):
        self.width              = _p_uint(self.chunk_data.read(4))
        self.height             = _p_uint(self.chunk_data.read(4))
        self.bit_depth          = _p_int(self.chunk_data.read(1))
        self.color_type         = _p_int(self.chunk_data.read(1))
        self.compression_method = _p_int(self.chunk_data.read(1))
        self.filter_method      = _p_int(self.chunk_data.read(1))
        self.interlace_method   = _p_int(self.chunk_data.read(1))
        if self.color_type & 1:
            self.color_type_info += 'palette-used;'
        if self.color_type & 2:
            self.color_type_info += 'color-used;'
        if self.color_type & 4:
            self.color_type_info += 'alpha-channel-used;'

        if self.color_type_info:
            self.color_type_info = self.color_type_info[:-1]

    def _test(self):
        assert self.width != 0, 'invalid value: width is zero'
        assert self.height != 0, 'invalid value: height is zero'

        color_type_msg = f'invalid value: color type {self.color_type} and/or bit depth {self.bit_depth} does not match specification'

        assert self.color_type in [0, 1, 2, 3, 4, 6], color_type_msg + ': valid color type values => [0, 1, 2, 3, 4, 6]'

        if self.color_type == 0:
            assert self.bit_depth in [1, 2, 4, 8, 16], color_type_msg + ': valid color type values => [1, 2, 4, 8, 16]'

        elif self.color_type == 3:
            assert self.bit_depth in [1, 2, 4, 8], color_type_msg + ': valid color type values => [1, 2, 4, 8]'

        elif self.color_type == 2 \
            or self.color_type == 4 \
            or self.color_type == 6:
            assert self.bit_depth in [8, 16], color_type_msg + ': valid color type values => [8, 16]'

    def _pack(self):
        self.chunk_data.write(
            struct.pack('>LLBBBBB', 
                self.width,
                self.height,
                self.bit_depth,
                self.color_type,
                self.compression_method,
                self.filter_method,
                self.interlace_method
            )
        )


@dataclass(repr=False)
class CT_PLTE(I_ChunkTemplate):
    entries : list = field(default_factory=list)
    
    def _parser(self):
        assert self.chunk_size % 3 == 0, 'invalid value: chunk length is not divisible by 3'
        l = self.chunk_size // 3
        for _ in range(l):
            c_rgb = C_RGB()
            c_rgb.r = _p_uint(self.chunk_data.read(1))
            c_rgb.g = _p_uint(self.chunk_data.read(1))
            c_rgb.b = _p_uint(self.chunk_data.read(1))

            self.entries.append(c_rgb)

    def _pack(self):
        for c in self.entries:
            self.chunk_data.write(
                struct.pack('>BBB', 
                    c.r,
                    c.g,
                    c.b
                )
            )


@dataclass(repr=False)
class CT_IDAT(I_ChunkTemplate):

    def _parser(self):
        pass

    def _pack(self):
        pass


@dataclass(repr=False)
class CT_IEND(I_ChunkTemplate):

    def _parser(self):
        pass

    def _pack(self):
        pass


@dataclass(repr=False)
class CT_cHRM(I_ChunkTemplate):
    white_point_x   : int = 0
    white_point_y   : int = 0
    red_x           : int = 0
    red_y           : int = 0
    green_x         : int = 0
    green_y         : int = 0
    blue_x          : int = 0
    blue_y          : int = 0

    def _parser(self):
        self.white_point_x = _p_uint(self.chunk_data.read(4))
        self.white_point_y = _p_uint(self.chunk_data.read(4))
        self.red_x         = _p_uint(self.chunk_data.read(4))
        self.red_y         = _p_uint(self.chunk_data.read(4))
        self.green_x       = _p_uint(self.chunk_data.read(4))
        self.green_y       = _p_uint(self.chunk_data.read(4))
        self.blue_x        = _p_uint(self.chunk_data.read(4))
        self.blue_y        = _p_uint(self.chunk_data.read(4))

    def _pack(self):
        self.chunk_data.write(
            struct.pack('>LLLLLLL',
                self.white_point_x,
                self.red_x,
                self.red_y,
                self.green_x,
                self.green_y,
                self.blue_x,
                self.blue_y
            )
        )


@dataclass(repr=False)
class CT_gAMA(I_ChunkTemplate):
    gamma: int = 0

    def _parser(self):
        self.gamma = _p_uint(self.chunk_data.read(4))

    def _pack(self):
        self.chunk_data.write(struct.pack('>L', self.gamma))


@dataclass(repr=False)
class CT_iCCP(I_ChunkTemplate):
    profile_name        : bytes = None
    compression_method  : bytes = None
    compression_profile : bytes = None

    def _parser(self):
        chunk = self.chunk_data.read()

        self.profile_name, compression_info = splice_null_sep(chunk)
        self.compression_method = compression_info[0:1]
        self.compression_profile = compression_info[1:]

    def _pack(self):
        self.chunk_data.write(self.profile_name)
        self.chunk_data.write(NULL_SEP)
        self.chunk_data.write(self.compression_method)
        self.chunk_data.write(self.compression_profile)


@dataclass(repr=False)
class CT_sBIT(I_ChunkTemplate):
    sbit : E_sBIT = E_sBIT(None, None, None, None)

    def _parser(self):

        ct_IHDR = self._png_instance.get_chunk(ENUM_PNG_CT_BYTES.IHDR, do_parse=True)
        color_type = ct_IHDR.color_type

        if color_type == 0:
            sbit_grey = _p_uint(self.chunk_data.read(1))
            self.sbit.r = sbit_grey
            self.sbit.g = sbit_grey
            self.sbit.b = sbit_grey
        elif color_type == 2 or color_type == 3:
            self.sbit.r = _p_uint(self.chunk_data.read(1))
            self.sbit.g = _p_uint(self.chunk_data.read(1))
            self.sbit.b = _p_uint(self.chunk_data.read(1))
        elif color_type == 4:
            sbit_grey = _p_uint(self.chunk_data.read(1))
            self.sbit.r = sbit_grey
            self.sbit.g = sbit_grey
            self.sbit.b = sbit_grey
            self.sbit.a = _p_uint(self.chunk_data.read(1))
        elif color_type == 6:
            self.sbit.r = _p_uint(self.chunk_data.read(1))
            self.sbit.g = _p_uint(self.chunk_data.read(1))
            self.sbit.b = _p_uint(self.chunk_data.read(1))
            self.sbit.a = _p_uint(self.chunk_data.read(1))

    def _pack(self):
        ct_IHDR = self._png_instance.get_chunk(ENUM_PNG_CT_BYTES.IHDR, do_parse=True)
        color_type = ct_IHDR.color_type
        if color_type == 0:
            self.chunk_data.write(struct.pack('>B', self.sbit.r))
        elif color_type == 2 or color_type == 3:
            self.chunk_data.write(
                struct.pack('>BBB', 
                    self.sbit.r,
                    self.sbit.g,
                    self.sbit.b
                )
            )
        elif color_type == 4:
            self.chunk_data.write(
                struct.pack('>BB', 
                    self.sbit.r,
                    self.sbit.a
                )
            )
        elif color_type == 6:
            self.chunk_data.write(
                struct.pack('>BBBB', 
                    self.sbit.r,
                    self.sbit.g,
                    self.sbit.b,
                    self.sbit.a
                )
            )


@dataclass(repr=False)
class CT_sRGB(I_ChunkTemplate):
    rendering_intent : int = None
    rendering_intent_value : str = None
    
    def _parser(self):
      self.rendering_intent = _p_uint(self.chunk_data.read(1))
      self.rendering_intent_value = LS_PNG_CT_sRGB_INTENT_V[self.rendering_intent]

    def _pack(self):
        self.chunk_data.write(struct.pack('>B', self.rendering_intent))


@dataclass(repr=False)
class CT_bKGD(I_ChunkTemplate):
    background_color : C_RGB = C_RGB()
    chunk_pLTE_index : int = None

    def _parser(self):

        ct_IHDR = self._png_instance.get_chunk(ENUM_PNG_CT_BYTES.IHDR, do_parse=True)
        color_type = ct_IHDR.color_type

        if color_type == 3:
            ct_PLTE = self._png_instance.get_chunk(ENUM_PNG_CT_BYTES.PLTE, do_parse=True)
            self.chunk_pLTE_index = _p_uint(self.chunk_data.read(1))
            c = ct_PLTE.entries[self.chunk_pLTE_index]
            self.background_color.r = c.r
            self.background_color.g = c.g
            self.background_color.b = c.b
        elif color_type == 0 or color_type == 4:
            c = _p_uint(self.chunk_data.read(2))
            self.background_color.r = c
            self.background_color.g = c
            self.background_color.b = c
        elif color_type == 2 or color_type == 6:
            self.background_color.r = _p_uint(self.chunk_data.read(2))
            self.background_color.g = _p_uint(self.chunk_data.read(2))
            self.background_color.b = _p_uint(self.chunk_data.read(2))

    def _pack(self):
        ct_IHDR = self._png_instance.get_chunk(ENUM_PNG_CT_BYTES.IHDR, do_parse=True)
        color_type = ct_IHDR.color_type
        if color_type == 3:
            self.chunk_data.write(struct.pack('>B', self.chunk_pLTE_index))
        elif color_type == 0 or color_type == 4:
            self.chunk_data.write(struct.pack('>H', self.background_color.r))
        elif color_type == 2 or color_type == 6:
            self.chunk_data.write(
                struct.pack('>HHH',
                    self.background_color.r,
                    self.background_color.g,
                    self.background_color.b
                )
            )


@dataclass(repr=False)
class CT_hIST(I_ChunkTemplate):
    entries : list = field(default_factory=list)

    def _parser(self):
        l = self.chunk_size // 2
        for _ in range(l):
            self.entries.append(_p_uint(self.chunk_data.read(2)))

    def _test(self):
        ct_PLTE = self._png_instance.get_chunk(ENUM_PNG_CT_BYTES.PLTE, do_parse=True)
        assert len(ct_PLTE.entries) == len(self.entries), 'invalid value: chunk hIST does not have equal length with PLTE'

    def _pack(self):
        for i in self.entries:
            self.chunk_data.write(struct.pack('>H', i))


@dataclass(repr=False)
class CT_tRNS(I_ChunkTemplate):
    alpha_channel : list = field(default_factory=list)

    def _parser(self):
        ct_IHDR = self._png_instance.get_chunk(ENUM_PNG_CT_BYTES.IHDR, do_parse=True)
        bit_depth = ct_IHDR.bit_depth
        color_type = ct_IHDR.color_type

        if color_type == 0:
            self.alpha_channel.append(
                _p_uint(self.chunk_data.read(2))
            )

        if color_type == 2:
            c_rgb = C_RGB()
            c_rgb.r = _p_uint(self.chunk_data.read(2))
            c_rgb.g = _p_uint(self.chunk_data.read(2))
            c_rgb.b = _p_uint(self.chunk_data.read(2))
            self.alpha_channel.append(
                c_rgb
            )

        if color_type == 3:
            for _ in range(self.chunk_size):
                self.alpha_channel.append(
                    _p_uint(self.chunk_data.read(1))
                )

    def _test(self):
        ct_IHDR = self._png_instance.get_chunk(ENUM_PNG_CT_BYTES.IHDR)
        color_type = ct_IHDR.color_type
        assert color_type not in [4, 6], f'invalid value: {self.chunk_type} prohibited for color types 4 and 6'

    def _pack(self):
        ct_IHDR = self._png_instance.get_chunk(ENUM_PNG_CT_BYTES.IHDR, do_parse=True)
        bit_depth = ct_IHDR.bit_depth
        color_type = ct_IHDR.color_type
        if color_type == 0 or color_type == 3:
            for i in self.alpha_channel:
                self.chunk_data.write(struct.pack('>B', i))
        elif color_type == 2:
            for c in self.alpha_channel:
                self.chunk_data.write(
                    struct.pack('>BBB', 
                        c.r,
                        c.g,
                        c.b
                    )
                )


@dataclass(repr=False)
class CT_pHYs(I_ChunkTemplate):
    pixel_per_unit_x : int = None
    pixel_per_unit_y : int = None
    unit_specifier   : int = None

    def _parser(self):
        self.pixel_per_unit_x = _p_uint(self.chunk_data.read(4))
        self.pixel_per_unit_y = _p_uint(self.chunk_data.read(4))
        self.unit_specifier = _p_uint(self.chunk_data.read(1))

    def _test(self):
        assert self.unit_specifier in [0, 1]

    def _pack(self):
        self.chunk_data.write(
            struct.pack('>LLB',
                self.pixel_per_unit_x,
                self.pixel_per_unit_y,
                self.unit_specifier
            )
        )



@dataclass(repr=False)
class CT_sPLT(I_ChunkTemplate):
    palette_name : bytes = None
    sample_depth : int = None
    entries      : list = field(default_factory=list)

    def _parser(self):
        chunk = self.chunk_data.read()
        self.palette_name, chunk = splice_null_sep(chunk)
        self.sample_depth, chunk = _p_uint(chunk[:1]), chunk[1:]
        n = 0
        l = 0
        if self.sample_depth == 8:
            n = 1
            l = len(chunk) // 6
        elif self.sample_depth == 16:
            n = 2
            l = len(chunk) // 10

        for _ in range(l):
            e_sPLT = E_sPLT()
            e_sPLT.color_space.r, chunk = _p_uint(chunk[:n]), chunk[n:]
            e_sPLT.color_space.g, chunk = _p_uint(chunk[:n]), chunk[n:]
            e_sPLT.color_space.b, chunk = _p_uint(chunk[:n]), chunk[n:]
            e_sPLT.color_space.a, chunk = _p_uint(chunk[:n]), chunk[n:]
            e_sPLT.frequency, chunk = _p_uint(chunk[:2]), chunk[2:]
            self.entries.append(e_sPLT)

            if not chunk:
                break

    def _pack(self):
        self.chunk_data.write(palette_name)
        self.chunk_data.write(struct.pack('>B', sample_depth))

        n = 0
        l = 0
        if self.sample_depth == 8:
            n = 1
        elif self.sample_depth == 16:
            n = 2

        for e in self.entries:
            fmt = ''
            if n == 1:
                fmt = 'B'
            elif n == 2:
                fmt = 'H'
            else:
                raise ValueError('huh')

            self.chunk_data.write(
                struct.pack(f'>{fmt*4}H', 
                    e.color_space.r,
                    e.color_space.g,
                    e.color_space.b,
                    e.color_space.a,
                    e.frequency
                )
            )


@dataclass(repr=False)
class CT_tIME(I_ChunkTemplate):
    year    : int = None
    month   : int = None
    day     : int = None
    hour    : int = None
    minute  : int = None
    second  : int = None

    def _parser(self):
        self.year   = _p_uint(self.chunk_data.read(2))
        self.month  = _p_uint(self.chunk_data.read(1))
        self.day    = _p_uint(self.chunk_data.read(1))
        self.hour   = _p_uint(self.chunk_data.read(1))
        self.minute = _p_uint(self.chunk_data.read(1))
        self.second = _p_uint(self.chunk_data.read(1))

    def _test(self):
        assert self.month  in range(1, 13)
        assert self.day    in range(1, 32)
        assert self.hour   in range(24)
        assert self.minute in range(60)
        assert self.second in range(61)

    def _pack(self):
        self.chunk_data.write(
            struct.pack('>HBBBB', 
                self.year,
                self.month,
                self.day,
                self.hour,
                self.minute,
                self.second
            )
        )


@dataclass(repr=False)
class CT_iTXt(I_ChunkTemplate):
    keyword             : bytes = None
    compression_flag    : int = None
    compression_method  : int = None
    language_tag        : bytes = None
    translated_keyword  : bytes = None
    text                : bytes = None

    def _parser(self):
        chunk = self.chunk_data.read()

        self.keyword, chunk = splice_null_sep(chunk)
        self.compression_flag, chunk = chunk[:1], chunk[1:]
        self.compression_method, chunk = chunk[:1], chunk[1:]
        self.language_tag, chunk = splice_null_sep(chunk)
        self.translated_keyword, chunk = splice_null_sep(chunk)
        self.text, chunk = splice_null_sep(chunk)

    def _pack(self):
        self.chunk_data.writelines(
            [
                self.keyword,
                NULL_SEP,
                self.compression_flag,
                self.compression_method,
                self.language_tag,
                NULL_SEP,
                self.translated_keyword,
                NULL_SEP,
                self.text
            ]
        )


@dataclass(repr=False)
class CT_tEXt(I_ChunkTemplate):
    keyword : bytes = ''
    text    : bytes = ''
    
    def _parser(self):
        chunk = self.chunk_data.read()
        self.keyword, self.text = splice_null_sep(chunk)

    def _pack(self):
        self.chunk_data.writelines(
            [
                self.keyword,
                NULL_SEP,
                self.text
            ]
        )


@dataclass(repr=False)
class CT_zTXt(I_ChunkTemplate):
    keyword             : bytes = None
    compression_method  : int = None
    compressed_text     : bytes = None

    def _parser(self):
        chunk = self.chunk_data.read()
        self.keyword, chunk = splice_null_sep(chunk)
        self.compression_method, self.compressed_text = chunk[:1], chunk[1:]

    def _pack(self):
        self.chunk_data.writelines(
            [
                self.keyword,
                NULL_SEP,
                struct.pack('>B', self.compression_method),
                self.compressed_text
            ]
        )

### PNG Class

In [20]:
class PNGImage:
    verbose : bool = False

    def __init__(self):
        self.signature = PNG_SIGNATURE
        self._chunks   = list()
        self._fname    = ''
        self._ext      = '.png'

    def set_filename(self, filename: str):
        self._fname = filename.split('.')[0]

    def get_filename(self):
        return self._fname + self._ext

    def set_signature(self, signature: bytes):
        self.signature = signature

    def check_signature(self, signature: bytes):
        return signature == PNG_SIGNATURE

    def append_chunk(self, ct: I_ChunkTemplate):
        ct.set_pngImageInstance(self)
        self._chunks.append(ct)

    def iter_chunks(self, chunk_type_enum = None):
        for i in self._chunks:
            if chunk_type_enum:
                if i.chunk_type != chunk_type_enum:
                    continue
            yield i

    def get_chunks(self, chunk_type_enum = None, do_parse = False):
        if chunk_type_enum:
            r = list(filter(lambda x: x.chunk_type == chunk_type_enum, self._chunks))
        else:
            r = self._chunks

        if do_parse:
            _ = [i.parse() for i in r]

        return r

    def get_chunk(self, chunk_type_enum = None, do_parse = False):
        r = self.get_chunks(chunk_type_enum)[0]
        if do_parse:
            r.parse()
        return r

    def parse_all(self):
        for c in self.iter_chunks():
            if self.verbose: print_w(f'parsing {c.chunk_type}; {c.chunk_size} bytes')
            errno = c.parse()
            if errno:
                # return
                pass

    def repack_all(self):
        for c in self.iter_chunks():
            if self.verbose: print_w(f'packing {c.chunk_type}; {c.chunk_size} bytes')
            errno = c.repack()
            if errno:
                # return
                pass

    def flush(self, fo):
        fo.write(self.signature)
        for c in self.iter_chunks():
            chunk_data = b''
            if c.chunk_data:
                c.chunk_data.seek(0)
                chunk_data = c.chunk_data.read()
            fo.write(struct.pack('>I', c.chunk_size))
            fo.write(c.chunk_type)
            fo.write(chunk_data)
            fo.write(crc_to_bytes(c.chunk_type + chunk_data))

    def sizeof(self):
        return sum(map(lambda x: x.chunk_size, self.iter_chunks()))

    def __repr__(self):

        # default:
        #
        # return '<%s.%s object at %s>' % (
        #     self.__class__.__module__,
        #     self.__class__.__name__,
        #     hex(id(self))
        # )

        return '<%s %s>' % (
            self.__class__.__name__,
            hex(id(self))
        )

PNGImage.verbose = VERBOSE

### Wrap em up

In [21]:
chunk_struct_constructor_dict = {
    ENUM_PNG_CT_BYTES.IHDR: CT_IHDR,
    ENUM_PNG_CT_BYTES.IDAT: CT_IDAT,
    ENUM_PNG_CT_BYTES.IEND: CT_IEND,
    ENUM_PNG_CT_BYTES.PLTE: CT_PLTE,

    ENUM_PNG_CT_BYTES.cHRM: CT_cHRM,
    ENUM_PNG_CT_BYTES.gAMA: CT_gAMA,
    ENUM_PNG_CT_BYTES.iCCP: CT_iCCP,
    ENUM_PNG_CT_BYTES.sBIT: CT_sBIT,
    ENUM_PNG_CT_BYTES.sRGB: CT_sRGB,
    ENUM_PNG_CT_BYTES.bKGD: CT_bKGD,
    ENUM_PNG_CT_BYTES.hIST: CT_hIST,
    ENUM_PNG_CT_BYTES.tRNS: CT_tRNS,
    ENUM_PNG_CT_BYTES.pHYs: CT_pHYs,
    ENUM_PNG_CT_BYTES.sPLT: CT_sPLT,
    ENUM_PNG_CT_BYTES.tIME: CT_tIME,
    ENUM_PNG_CT_BYTES.iTXt: CT_iTXt,
    ENUM_PNG_CT_BYTES.tEXt: CT_tEXt,
    ENUM_PNG_CT_BYTES.zTXt: CT_zTXt,
}

In [22]:
def parse_png_file(path: Path, do_parse: bool = True) -> PNGImage:

    filename = ''
    if isinstance(path, Path):
        filename = path.name
    elif isinstance(path, str):
        filename = os.path.split(path)[-1]

    if not is_path_png(filename):
        print_w(f'file {filename} does not have correct png extension')
        return

    png = PNGImage()
    png.set_filename(filename)

    with open(path, 'rb') as fo:
        png_signature = fo.read(8)

        # for the sake of continuity, this and the rest of basic value checks does not raise any error
        if not png.check_signature(png_signature):
            print_w(f'WARNING! file does not contain or has correct PNG signature: {png_signature}') 
            png.set_signature(png_signature) 

        while not at_eof(fo):
            chunk_size = _p_uint(fo.read(4))
            chunk_type = fo.read(4)
            chunk_data = fo.read(chunk_size)
            chunk_crc  = fo.read(4)

            if chunk_crc != crc_to_bytes(chunk_type + chunk_data):
                print_w('WARNING! file is corrupted: crc hash does not match')

            if chunk_type not in enum_fetch_all(ENUM_PNG_CT_BYTES):
                print_w(f'WARNING! chunk type {chunk_type} is not present in ENUM_PNG_CT_BYTES')
                continue

            elif chunk_type not in chunk_struct_constructor_dict:
                print_w(f'WARNING! chunk type {chunk_type} parser is not yet implemented')
                continue

            t = chunk_struct_constructor_dict.get(chunk_type)()
            png.append_chunk(t)

            t.chunk_size = chunk_size
            t.chunk_data = BytesIO(chunk_data) if chunk_size else None
            t.chunk_type = chunk_type
            t.chunk_crc = chunk_crc

        if do_parse:
            png.parse_all()

    return png


### `if __name__ == '__main__':`

In [23]:
# %%capture parse_pngs --no-display

total_time_elapsed = 0
total_size_parsed = 0
total_n_files = len(target_files)
for n_f, target_file_path in enumerate(target_files):
    n_f += 1

    # bunch of unnecessary print 
    suppress_print_w = True
    if VERBOSE: print_w('-------------------------------------------------')
    suppress_print_w = False
    if VERBOSE: print_w(f'file {n_f:03}/{total_n_files:03}')
    if VERBOSE: print_w('target file      :', f'"{target_file_path}"')
    if STD_TEST_INPUT_TEST and VERBOSE:
        print_w('test description :', parse_png_test_fname_preset(target_file_path))
    suppress_print_w = True
    if VERBOSE: print_w('-------------------------------------------------')

    start = time.time()
    png = parse_png_file(target_file_path)
    elapsed = time.time() - start

    total_time_elapsed += elapsed
    total_size_parsed += png.sizeof()

    if VERBOSE: print_w(f'parsed chunks:', )
    if png:
        for n, c in enumerate(png.iter_chunks()):
            if VERBOSE: print_w(f'  [{n:03}] -', c)
    
        if TEST_FLUSH_TO_FS:
            if TEST_REPACK:
                if VERBOSE: print_w('repacking parsed result')
                png.repack_all()
            with open(Path(TEST_FLUSH_FOLDER) / png.get_filename(), 'wb') as fo:
                png.flush(fo)

    if VERBOSE: print_w('-------------------------------------------------')
    if VERBOSE: print_w('parser status    :', 'succeed' if all(map(lambda x: x.is_parsed, png.iter_chunks())) else 'failed')
    suppress_print_w = False
    if VERBOSE: print_w('elapsed time     :', f'{elapsed} seconds')
    suppress_print_w = True
    if VERBOSE: print_w('-------------------------------------------------')
    if VERBOSE: print_w()
    suppress_print_w = False
    if VERBOSE: print_w()

if VERBOSE: print_w('=================================================')
if VERBOSE: print_w('some stats:')
if VERBOSE: print_w('  - total file parsed   :', f'{len(target_files)}')
if VERBOSE: print_w('  - inferred total size :', f'{total_size_parsed} bytes')
if VERBOSE: print_w('  - total elapsed time  :', f'{total_time_elapsed} seconds')


file 001/176
target file      : "test-input\basi0g01.png"
test description : black & white
elapsed time     : 0.00498199462890625 seconds

file 002/176
target file      : "test-input\basi0g02.png"
test description : 2 bit (4 level) grayscale
elapsed time     : 0.004961252212524414 seconds

file 003/176
target file      : "test-input\basi0g04.png"
test description : 4 bit (16 level) grayscale
elapsed time     : 0.007637739181518555 seconds

file 004/176
target file      : "test-input\basi0g08.png"
test description : 8 bit (256 level) grayscale
elapsed time     : 0.006098270416259766 seconds

file 005/176
target file      : "test-input\basi0g16.png"
test description : 16 bit (64k level) grayscale
elapsed time     : 0.006433725357055664 seconds

file 006/176
target file      : "test-input\basi2c08.png"
test description : 3x8 bits rgb color
elapsed time     : 0.003019094467163086 seconds

file 007/176
target file      : "test-input\basi2c16.png"
test description : 3x16 bits rgb color
elaps

## Appendix: File Flush Test

In [24]:
def checksum_fixed_buf_s(fp, hash_algo, buf_size=None):
    with open(fp, 'rb') as fo:
        while True:
            if not buf_size:
                d = fo.read()
            else:
                d = fo.read(buf_size)
            if not d:
                break
            hash_algo.update(d)

def splice_seq(nonmutable_seq, length):
    t = []
    while nonmutable_seq:
        a, nonmutable_seq = nonmutable_seq[:length], nonmutable_seq[length:]
        t.append(a)
    return t


if TEST_FLUSH_TO_FS:
    import hashlib, difflib

    perf_hash_algo = hashlib.sha1 # or sha1, md5, and many more within hashlib.algorithms_available,
                                  # preferably hashlib.algorithms_guaranteed

    buf_size = 0xffff

    test_files = list(cwd.glob('*.png'))
    output_files = list(Path(TEST_FLUSH_FOLDER).glob('*.png'))

    def fetch_name(o: Path): return o.name
    def intersect(a: list, b: list, fn): return list(filter(lambda x: fn(x) in map(fn, b), a))

    output_files, test_files = \
        intersect(output_files, test_files, fetch_name), \
        intersect(test_files, output_files, fetch_name)

    c = len(test_files)

    flag = False
    for n, t in enumerate(test_files):
        o = output_files[n]
        n += 1

        t_h = perf_hash_algo()
        o_h = perf_hash_algo()

        checksum_fixed_buf_s(t, t_h, buf_size=buf_size)
        checksum_fixed_buf_s(o, o_h, buf_size=buf_size)

        if not t_h.digest() == o_h.digest(): 
            print(f'{n:03}/{c:03} test file "{t}" and flush output file "{o}" hash does not match -> test desc: {parse_png_test_fname_preset(t.name)}')

            # for i in difflib.diff_bytes(
            #         difflib.unified_diff, 
            #         splice_seq(t.read_bytes(), 4), 
            #         splice_seq(o.read_bytes(), 4)
            #     ):
            #     if i.startswith(b'+') or i.startswith(b'-'):
            #         print(i)

            flag = True

    if not flag:
        print('succeed with no discrepancies')

039/176 test file "test-input\ccwn2c08.png" and flush output file "export\ccwn2c08.png" hash does not match -> test desc: chroma chunk w:0.3127,0.3290 r:0.64,0.33 g:0.30,0.60 b:0.15,0.06
040/176 test file "test-input\ccwn3p08.png" and flush output file "export\ccwn3p08.png" hash does not match -> test desc: chroma chunk w:0.3127,0.3290 r:0.64,0.33 g:0.30,0.60 b:0.15,0.06
064/176 test file "test-input\exif2c08.png" and flush output file "export\exif2c08.png" hash does not match -> test desc: not documented
145/176 test file "test-input\tbbn0g04.png" and flush output file "export\tbbn0g04.png" hash does not match -> test desc: not documented
150/176 test file "test-input\tbrn2c08.png" and flush output file "export\tbrn2c08.png" hash does not match -> test desc: transparent, red background chunk
160/176 test file "test-input\xc9n2c08.png" and flush output file "export\xc9n2c08.png" hash does not match -> test desc: not documented
162/176 test file "test-input\xcsn0g01.png" and flush outpu