Skip to content

Commit

Permalink
Added new Pimarc format
Browse files Browse the repository at this point in the history
Most of basic implementation of file reading and writing now complete but not tested.

This will be used to replace tar as the archive format for storing grouped iterable corpora. For now, it's not used anywhere, but once it's been tested, it will become the standard format for writing GroupedCorpora. We will still support reading from old tar-based corpora, but will detect which format has been used.
  • Loading branch information
markgw committed Mar 3, 2020
1 parent b28fd35 commit 1212174
Show file tree
Hide file tree
Showing 8 changed files with 439 additions and 0 deletions.
48 changes: 48 additions & 0 deletions src/python/pimlico/utils/pimarc/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""The Pimlico Archive format
Implementation of a simple multi-file archive format, somewhat like tar.
Pimlico multi-file datasets currently use tar to store many files in one archive. This
was attractive because of its simplicity and the fact that the files can be iterated
over in order efficiently. However, tar is an old format and has certain quirks.
The biggest downside is that random access (reading files not in the order stored or
jumping into the middle of an archive) is very slow.
The Pimlico Archive format (prc) aims to be a very simple generic archive format.
It has the same property as tars that it is fast to iterate over files in order. But
it also stores an index that can be loaded into memory to make it quick to jump into
the archive and potentially access the files in a random order.
It stores very little information about the files. In this sense, it is simpler than
tar. It does not store, for example, file timestamps or permissions, since we do not
need these things for documents in a Pimlico corpus. It does, however, have a generic
JSON metadata dictionary for each file, so metadata like this can be stored as
necessary.
Iterating over files in order is still likely to be substantially faster than random
access (depending on the underlying storage), so it is recommended to add files to
the archive in the sequential order that they are used in. This is the typical use
case in Pimlico: a dataset is created in order, one document at a time, and stored
iteratively. Then another module reads and processes those documents in the same order.
In keeping with this typical use case in Pimlico, a Pimarc can be opened for reading
only, writing only (new archive) or appending, just like normal files. You cannot,
for example, open an archive and move files around, or delete a file. To do these
things, you must read in an archive using a reader and write out a new, modified one
using a writer.
Restrictions on filenames:
Filenames may use any unicode characters, excluding EOF, newline and tab.
"""
from .reader import PimarcReader
from .writer import PimarcWriter


def open_archive(path, mode="r"):
if mode == "r":
return PimarcReader(path)
elif mode in ("w", "a"):
return PimarcWriter(path, mode=mode)
else:
raise ValueError("unknown mode '{}'".format(mode))
63 changes: 63 additions & 0 deletions src/python/pimlico/utils/pimarc/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from collections import OrderedDict
from builtins import *


class PimarcIndex(object):
"""
Simple index to accompany a Pimarc, stored along with the `.prc` file as a
`.prci` file. Provides a list of the filenames in the archive, along with
the starting byte of the file's metadata and data.
"""
def __init__(self):
self.filenames = OrderedDict()

def get_metadata_start_byte(self, filename):
try:
return self.filenames[filename][0]
except KeyError:
raise FilenameNotInArchive(filename)

def get_data_start_byte(self, filename):
try:
return self.filenames[filename][1]
except KeyError:
raise FilenameNotInArchive(filename)

def __getitem__(self, item):
return self.get_metadata_start_byte(item), self.get_data_start_byte(item)

def append(self, filename, metadata_start, data_start):
if filename in self.filenames:
raise DuplicateFilename(filename)
self.filenames[filename] = (metadata_start, data_start)

@staticmethod
def load(filename):
index = PimarcIndex()
with open(filename, "r") as f:
for line in f:
# Remove the newline char
line = line[:-1]
# There should be three tab-separated values: filename, metadata start and data start
doc_filename, metadata_start, data_start = line.split("\t")
metadata_start, data_start = int(metadata_start), int(data_start)
index.append(doc_filename, metadata_start, data_start)
return index

def save(self, path):
with open(path, "w") as f:
for doc_filename, (metadata_start, data_start) in self.filenames.items():
f.write("{}\t{}\t{}\n".format(doc_filename, metadata_start, data_start))


class FilenameNotInArchive(Exception):
def __init__(self, filename):
super().__init__(u"filename '{}' not found in archive".format(filename))
self.filename = filename


class DuplicateFilename(Exception):
def __init__(self, filename):
super().__init__(u"filename '{}' already in archive: cannot add it again".format(filename))
self.filename = filename
123 changes: 123 additions & 0 deletions src/python/pimlico/utils/pimarc/reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import json

from .index import PimarcIndex
from pimlico.utils.varint import decode_stream


class PimarcReader(object):
"""
The Pimlico Archive format: read-only archive.
"""
def __init__(self, archive_filename):
self.archive_filename = archive_filename
self.index_filename = "{}i".format(archive_filename)
self.index = None
self.archive_file = None

def open(self):
"""
Open the archive file.
"""
return open(self.archive_filename, mode="rb")

def __enter__(self):
self.archive_file = self.open()
self.index = PimarcIndex.load(self.index_filename)

def __exit__(self, exc_type, exc_val, exc_tb):
self.archive_file.close()

def __getitem__(self, item):
"""
Random access into the archive. Load a named file's data and metadata.
"""
# Look up the filename in the index and get pointers to its metadata and data
metadata_start, data_start = self.index[item]
# Jump to the start of the metadata
self.archive_file.seek(metadata_start)
# Read the metadata
metadata = self._read_metadata()
# There's some redundancy in this case: we're now presumably at the start
# of the data, so don't need data_start
# Assume that this is the case and continue reading from where we stopped
data = _read_var_length_data(self.archive_file)
return metadata, data

def _read_metadata(self):
"""
Assuming the file is currently at the start of a metadata block, read and
parse that metadata.
"""
# Read the metadata
metadata_data = _read_var_length_data(self.archive_file)
# Decode the metadata and parse as JSON
metadata = json.loads(metadata_data.decode("utf-8"))
return metadata

def iter_metadata(self):
"""
Iterate over all files in the archive, yielding just the metadata, skipping
over the data.
"""
# Make sure we're at the start of the file
self.archive_file.seek(0)
while True:
# Try reading the metadata of the next file
try:
metadata = self._read_metadata()
except EOFError:
# At this point, it's normal to get an EOF: we've just got to the end neatly
break
# This should be followed by the file's data, which we skip over, since we don't need it
_skip_var_length_data(self.archive_file)
yield metadata

def iter_files(self):
"""
Iterate over files, together with their JSON metadata, which includes their name (as "name").
"""
# Make sure we're at the start of the file
self.archive_file.seek(0)
while True:
# Try reading the metadata of the next file
try:
metadata = self._read_metadata()
except EOFError:
# At this point, it's normal to get an EOF: we've just got to the end neatly
break
# This should be followed by the file's data immediately
# Read it in
# If there's an EOF here, something's wrong with the file
data = _read_var_length_data(self.archive_file)
yield metadata, data

def __iter__(self):
return self.iter_files()


def _read_var_length_data(reader):
"""
Read some data from a file-like object by first reading a varint that says how many
bytes are in the data and then reading the data immediately following.
"""
# Get a single varint from the reader stream
data_length = decode_stream(reader)
# Read the data as a bytes array
return reader.read(data_length)


def _skip_var_length_data(reader):
"""
Like read_var_length_data, but doesn't actually read the data. Just reads the length
indicator and seeks to the end of the data.
"""
data_length = decode_stream(reader)
reader.seek(data_length, 1)
102 changes: 102 additions & 0 deletions src/python/pimlico/utils/pimarc/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import json
import os

from future.utils import raise_from

from pimlico.utils.varint import encode
from .index import PimarcIndex


class PimarcWriter(object):
"""
The Pimlico Archive format: writing new archives or appending existing ones.
"""
def __init__(self, archive_filename, mode="w"):
self.archive_filename = archive_filename
self.index_filename = "{}i".format(archive_filename)
self.append = mode == "a"

if self.append:
# Check the old archive already exists
if not os.path.exists(archive_filename):
raise IOError("cannot append to non-existent archive: {}".format(archive_filename))
if not os.path.exists(self.index_filename):
raise IOError("cannot append to archive: index file doesn't exist: {}".format(self.index_filename))
else:
# Remove any existing files
if os.path.exists(archive_filename):
os.remove(archive_filename)
if os.path.exists(self.index_filename):
os.remove(self.index_filename)

def open(self):
"""
Open the archive file.
"""
return open(self.archive_filename, mode="ab" if self.append else "wb")

def __enter__(self):
self.archive_file = self.open()
if self.append:
# Load the index so far
self.index = PimarcIndex.load(self.index_filename)
else:
# Create an empty index
self.index = PimarcIndex()

def __exit__(self, exc_type, exc_val, exc_tb):
self.archive_file.close()
# We need to store the updated index, which is only updated in memory while writing
self.index.save(self.index_filename)

def write_file(self, metadata, data):
"""
Append a write to the end of the archive. The metadata should be a dictionary
that can be encoded as JSON (which is how it will be stored). The data should
be a bytes object.
"""
# The file's name should always be in the metadata as "name"
try:
filename = metadata["name"]
except KeyError:
raise MetadataError("metadata should include 'name' key")
# Check where we're up to in the file
# This tells us where the metadata starts, which will be stored in the index
metadata_start = self.archive_file.tell()
# Encode the metadata as utf-8 JSON
try:
metadata_data = json.dumps(metadata).encode("utf-8")
except Exception as e:
raise_from(MetadataError("problem encoding metadata as JSON"), e)

# Write it to the file, including its length
_write_var_length_data(self.archive_file, metadata_data)

# Check where we're up to in the file
# This tells us where the file data starts, which will be stored in the index
data_start = self.archive_file.tell()
# Write out the data, including its length
_write_var_length_data(self.archive_file, data)

# Add the file to the index
self.index.append(filename, metadata_start, data_start)


def _write_var_length_data(writer, data):
"""
Write some data to a file-like object by first writing a varint that says how many
bytes are in the data and then writing the data immediately following.
"""
# Store the length of the data in bytes
data_length = len(data)
writer.write(encode(data_length))
# Write the data as a bytes array
return writer.write(data)


class MetadataError(Exception):
pass

0 comments on commit 1212174

Please sign in to comment.