In [17]:
# Quick Function Benchmarks

In [27]:
## setup
import unicodedata


def force_unicode(s):
    return str(s)


def normalize_unicode(s):
    return unicodedata.normalize("NFC", s)


class BagError(Exception):
    pass


class BagValidationError(BagError):
    def __init__(self, message, details=None):
        super(BagValidationError, self).__init__()

        if details is None:
            details = []

        self.message = message
        self.details = details

    def __str__(self):
        if len(self.details) > 0:
            details = "; ".join([force_unicode(e) for e in self.details])
            return "%s: %s" % (self.message, details)
        return self.message


class ManifestErrorDetail(BagError):
    def __init__(self, path):
        super(ManifestErrorDetail, self).__init__()

        self.path = path


class ChecksumMismatch(ManifestErrorDetail):
    def __init__(self, path, algorithm=None, expected=None, found=None):
        super(ChecksumMismatch, self).__init__(path)

        self.path = path
        self.algorithm = algorithm
        self.expected = expected
        self.found = found

    def __str__(self):
        return _(
            '%(path)s %(algorithm)s validation failed: expected="%(expected)s" found="%(found)s"'
        ) % {
            "path": force_unicode(self.path),
            "algorithm": self.algorithm,
            "expected": self.expected,
            "found": self.found,
        }


class FileMissing(ManifestErrorDetail):
    def __str__(self):
        return _(
            "%s exists in manifest but was not found on filesystem"
        ) % force_unicode(self.path)


class UnexpectedFile(ManifestErrorDetail):
    def __str__(self):
        return _("%s exists on filesystem but is not in the manifest") % self.path


class FileNormalizationConflict(BagError):
    """
    Exception raised when two files differ only in normalization and thus
    are not safely portable
    """

    def __init__(self, file_a, file_b):
        super(FileNormalizationConflict, self).__init__()

        self.file_a = file_a
        self.file_b = file_b

    def __str__(self):
        return _(
            'Unicode normalization conflict for file "%(file_a)s" and "%(file_b)s"'
        ) % {"file_a": self.file_a, "file_b": self.file_b}

In [28]:
## filenames.py

In [29]:
import re

In [30]:
def _encode_filename_nochain(s):
    s = s.replace("\r", "%0D")
    s = s.replace("\n", "%0A")
    return s


def _decode_filename_regex(s):
    s = re.sub(r"%0D", "\r", s, re.IGNORECASE)
    s = re.sub(r"%0A", "\n", s, re.IGNORECASE)
    return s


In [31]:
def _encode_filename_chain(s):
    return s.replace("\r", "%0D").replace("\n", "%0A")

def _decode_filename_replace(s):
    return s.replace("%0D", "\r").replace("%0A", "\n").replace("%0d", "\r").replace("%0a", "\n")

In [32]:
%%timeit 
_encode_filename_nochain('some_test_filename_98239018.txt')

84.8 ns ± 0.599 ns per loop (mean ± std. dev. of 7 runs, 10,000,000 loops each)


In [33]:
%%timeit 
_encode_filename_chain('some_test_filename_98239018.txt')

79.2 ns ± 0.284 ns per loop (mean ± std. dev. of 7 runs, 10,000,000 loops each)


In [34]:
%%timeit 
_decode_filename_regex('some_test_filename_98239018.txt')

528 ns ± 6.5 ns per loop (mean ± std. dev. of 7 runs, 1,000,000 loops each)


In [35]:
%%timeit
_decode_filename_replace('some_test_filename_98239018.txt')

131 ns ± 0.926 ns per loop (mean ± std. dev. of 7 runs, 10,000,000 loops each)


In [36]:
## hashing.py

In [37]:
import hashlib
import os
import logging

In [38]:
def _calculate_file_hashes_old(full_path, f_hashers):
    """
    Returns a dictionary of (algorithm, hexdigest) values for the provided
    filename
    """
    LOGGER.info(_("Verifying checksum for file %s"), full_path)

    try:
        with open(full_path, "rb") as f:
            while True:
                block = f.read(HASH_BLOCK_SIZE)
                if not block:
                    break
                for i in f_hashers.values():
                    i.update(block)
    except (OSError, IOError) as e:
        raise BagValidationError(
            _("Could not read %(filename)s: %(error)s")
            % {"filename": full_path, "error": force_unicode(e)}
        )

    return dict((alg, h.hexdigest()) for alg, h in f_hashers.items())


In [39]:
def _calculate_file_hashes_new(full_path, f_hashers):
    """
    Returns a dictionary of (algorithm, hexdigest) values for the provided
    filename
    """
    LOGGER.info(_("Verifying checksum for file %s"), full_path)
    
    hashers = list(f_hashers.values())  # Get hashers once before the loop

    try:
        with open(full_path, "rb") as f:
            for block in iter(lambda: f.read(HASH_BLOCK_SIZE), b''):
                for hasher in hashers:
                    hasher.update(block)
    except (OSError, IOError) as e:
        raise BagValidationError(
            _("Could not read %(filename)s: %(error)s")
            % {"filename": full_path, "error": force_unicode(e)}
        )

    return {alg: hasher.hexdigest() for alg, hasher in f_hashers.items()}


In [None]:
LOGGER = logging.getLogger('notebook_test')
hashes = ['sha512']
algorithms
f_hashers = dict((alg, hashlib.new(alg)) for alg in hashes if alg in algorithms)

In [None]:
%%timeit 
_calculate_file_hashes_old(full_path, f_hashers)