diff --git a/HISTORY.rst b/HISTORY.rst index 762a034..5873eac 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,22 @@ History ------- +1.2.0 (2015-04-XX) +++++++++++++++++++ + +* Previously if ``MODE_FILE`` was used and the database was loaded before + forking, the parent and children would use the same file table entry without + locking causing errors reading the database due to the offset being changed + by other processes. In ``MODE_FILE``, the reader will now use ``os.pread`` + when available and a lock when ``os.pread`` is not available (e.g., Python + 2). If you are using ``MODE_FILE`` on a Python without ``os.pread``, it is + recommended that you open the database after forking to reduce resource + contention. +* The ``Metadata`` class now overloads ``__repr__`` to provide a useful + representation of the contents when debugging. +* An ``InvalidDatabaseError`` will now be thrown if the data type read from + the database is invalid. Previously a ``KeyError`` was thrown. + 1.1.1 (2014-12-10) ++++++++++++++++++ diff --git a/maxminddb/compat.py b/maxminddb/compat.py index 24e4f0e..14c9883 100644 --- a/maxminddb/compat.py +++ b/maxminddb/compat.py @@ -2,19 +2,10 @@ # pylint: skip-file -is_py2 = sys.version_info[0] == 2 - -is_py3_3_or_better = ( - sys.version_info[0] >= 3 and sys.version_info[1] >= 3) - -if is_py2 and not is_py3_3_or_better: +if sys.version_info[0] == 2: import ipaddr as ipaddress # pylint:disable=F0401 ipaddress.ip_address = ipaddress.IPAddress -else: - import ipaddress # pylint:disable=F0401 - -if is_py2: int_from_byte = ord FileNotFoundError = IOError @@ -25,8 +16,9 @@ def int_from_bytes(b): return 0 byte_from_int = chr - else: + import ipaddress # pylint:disable=F0401 + int_from_byte = lambda x: x FileNotFoundError = FileNotFoundError diff --git a/maxminddb/decoder.py b/maxminddb/decoder.py index 7bc46d4..1b8f071 100644 --- a/maxminddb/decoder.py +++ b/maxminddb/decoder.py @@ -126,6 +126,10 @@ def decode(self, offset): if not type_num: (type_num, new_offset) = self._read_extended(new_offset) + if not type_num in self._type_decoder: + raise InvalidDatabaseError('Unexpected type number ({type}) ' + 'encountered'.format(type=type_num)) + (size, new_offset) = self._size_from_ctrl_byte( ctrl_byte, new_offset, type_num) return self._type_decoder[type_num](self, size, new_offset) diff --git a/maxminddb/file.py b/maxminddb/file.py index 3abcf8f..3460893 100644 --- a/maxminddb/file.py +++ b/maxminddb/file.py @@ -1,7 +1,12 @@ -"""This is intended for internal use only""" +"""For internal use only. It provides a slice-like file reader.""" import os +try: + from multiprocessing import Lock +except ImportError: + from threading import Lock + class FileBuffer(object): @@ -10,21 +15,20 @@ class FileBuffer(object): def __init__(self, database): self._handle = open(database, 'rb') self._size = os.fstat(self._handle.fileno()).st_size + if not hasattr(os, 'pread'): + self._lock = Lock() def __getitem__(self, key): if isinstance(key, slice): - self._handle.seek(key.start) - return self._handle.read(key.stop - key.start) + return self._read(key.stop - key.start, key.start) elif isinstance(key, int): - self._handle.seek(key) - return self._handle.read(1) + return self._read(1, key) else: raise TypeError("Invalid argument type.") def rfind(self, needle, start): """Reverse find needle from start""" - self._handle.seek(start) - pos = self._handle.read(self._size - start - 1).rfind(needle) + pos = self._read(self._size - start - 1, start).rfind(needle) if pos == -1: return pos return start + pos @@ -36,3 +40,26 @@ def size(self): def close(self): """Close file""" self._handle.close() + + if hasattr(os, 'pread'): + + def _read(self, buffersize, offset): + """read that uses pread""" + # pylint: disable=no-member + return os.pread(self._handle.fileno(), buffersize, offset) + + else: + + def _read(self, buffersize, offset): + """read with a lock + + This lock is necessary as after a fork, the different processes + will share the same file table entry, even if we dup the fd, and + as such the same offsets. There does not appear to be a way to + duplicate the file table entry and we cannot re-open based on the + original path as that file may have replaced with another or + unlinked. + """ + with self._lock: + self._handle.seek(offset) + return self._handle.read(buffersize) diff --git a/maxminddb/reader.py b/maxminddb/reader.py index 6c0b24f..5ecfbdf 100644 --- a/maxminddb/reader.py +++ b/maxminddb/reader.py @@ -76,7 +76,8 @@ def __init__(self, database, mode=MODE_AUTO): metadata_start += len(self._METADATA_START_MARKER) metadata_decoder = Decoder(self._buffer, metadata_start) (metadata, _) = metadata_decoder.decode(metadata_start) - self._metadata = Metadata(**metadata) # pylint: disable=star-args + self._metadata = Metadata( + **metadata) # pylint: disable=bad-option-value self._decoder = Decoder(self._buffer, self._metadata.search_tree_size + self._DATA_SECTION_SEPARATOR_SIZE) @@ -176,6 +177,7 @@ def _resolve_data_pointer(self, pointer): def close(self): """Closes the MaxMind DB file and returns the resources to the system""" + # pylint: disable=unidiomatic-typecheck if type(self._buffer) not in (str, bytes): self._buffer.close() @@ -210,3 +212,10 @@ def node_byte_size(self): def search_tree_size(self): """The size of the search tree""" return self.node_count * self.node_byte_size + + def __repr__(self): + args = ', '.join('%s=%r' % x for x in self.__dict__.items()) + return '{module}.{class_name}({data})'.format( + module=self.__module__, + class_name=self.__class__.__name__, + data=args) diff --git a/tests/data b/tests/data index f887fec..90c7fb9 160000 --- a/tests/data +++ b/tests/data @@ -1 +1 @@ -Subproject commit f887fec99eabf6d68746e257b894bae854fefc27 +Subproject commit 90c7fb95d67ee03ca7fc487fb69f525bcc19a671 diff --git a/tests/reader_test.py b/tests/reader_test.py index d7a659e..0e70157 100644 --- a/tests/reader_test.py +++ b/tests/reader_test.py @@ -3,8 +3,12 @@ from __future__ import unicode_literals +import logging import os import sys +import threading + +from multiprocessing import Process, Pipe import maxminddb @@ -233,6 +237,43 @@ def test_closed_metadata(self): self.assertIsNotNone( metadata, 'pure Python implementation returns value') + def test_multiprocessing(self): + self._check_concurrency(Process) + + def test_threading(self): + self._check_concurrency(threading.Thread) + + def _check_concurrency(self, worker_class): + reader = open_database( + 'tests/data/test-data/GeoIP2-Domain-Test.mmdb', + self.mode + ) + + def lookup(pipe): + try: + for i in range(32): + reader.get('65.115.240.{i}'.format(i=i)) + pipe.send(1) + except: + pipe.send(0) + finally: + if worker_class is Process: + reader.close() + pipe.close() + + pipes = [Pipe() for _ in range(32)] + procs = [worker_class(target=lookup, args=(c,)) for (p, c) in pipes] + for proc in procs: + proc.start() + for proc in procs: + proc.join() + + reader.close() + + count = sum([p.recv() for (p, c) in pipes]) + + self.assertEqual(count, 32, 'expected number of successful lookups') + def _check_metadata(self, reader, ip_version, record_size): metadata = reader.metadata()