Skip to content

Commit

Permalink
Added new module for mapping a term-feature-count corpus to indices u…
Browse files Browse the repository at this point in the history
…sing a dictionary and storing in a super-efficient format.

Several bug fixes.
  • Loading branch information
Mark Granroth-Wilding committed Apr 6, 2016
1 parent 9105e61 commit eae8b00
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 30 deletions.
5 changes: 3 additions & 2 deletions src/python/pimlico/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,12 @@ def load_module_info(self, module_name):
try:
# First see if this is a datatype
datatype_class = load_datatype(module_config["type"])
# Get an input module info class for this datatype
module_info_class = input_module_factory(datatype_class)
except DatatypeLoadError:
# Not a datatype
module_info_class = load_module_info(module_config["type"])
else:
# Get an input module info class for this datatype
module_info_class = input_module_factory(datatype_class)

# Allow document map types to be used as filters simply by specifying filter=T
filter_type = str_to_bool(module_config.pop("filter", ""))
Expand Down
11 changes: 6 additions & 5 deletions src/python/pimlico/datatypes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ def write_metadata(self):
pickle.dump(self.metadata, f, -1)


class IterableDocumentCorpus(PimlicoDatatype):
class IterableCorpus(PimlicoDatatype):
"""
Superclass of all datatypes which represent a dataset that can be iterated over document by document.
Superclass of all datatypes which represent a dataset that can be iterated over document by document
(or datapoint by datapoint - what exactly we're iterating over may vary, though documents are most common).
The actual type of the data depends on the subclass: it could be, e.g. coref output, etc.
At creation time, length should be provided in the metadata, denoting how many documents are in the dataset.
Expand All @@ -120,7 +121,7 @@ class IterableDocumentCorpus(PimlicoDatatype):
datatype_name = "iterable_corpus"

def __init__(self, *args, **kwargs):
super(IterableDocumentCorpus, self).__init__(*args, **kwargs)
super(IterableCorpus, self).__init__(*args, **kwargs)

def __iter__(self):
"""
Expand All @@ -135,9 +136,9 @@ def __len__(self):
return self.metadata["length"]


class IterableDocumentCorpusWriter(PimlicoDatatypeWriter):
class IterableCorpusWriter(PimlicoDatatypeWriter):
def __exit__(self, exc_type, exc_val, exc_tb):
super(IterableDocumentCorpusWriter, self).__exit__(exc_type, exc_val, exc_tb)
super(IterableCorpusWriter, self).__exit__(exc_type, exc_val, exc_tb)
# Check the length has been set
if "length" not in self.metadata:
raise DatatypeWriteError("writer for IterableDocumentCorpus must set a 'length' value in the metadata")
Expand Down
179 changes: 179 additions & 0 deletions src/python/pimlico/datatypes/features.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import os
import struct

import cPickle as pickle
from operator import itemgetter

from pimlico.core.modules.map import invalid_doc_on_error
from pimlico.datatypes.base import IterableCorpus, DatatypeLoadError, IterableCorpusWriter
from pimlico.datatypes.tar import TarredCorpus, TarredCorpusWriter, pass_up_invalid


Expand Down Expand Up @@ -109,6 +114,180 @@ def document_to_raw_data(self, doc):
return super(TermFeatureListCorpusWriter, self).document_to_raw_data(data)


BYTE_FORMATS = {
# (num bytes, signed)
(1, True): "b", # signed char
(1, False): "B", # unsigned char
(2, True): "h", # signed short
(2, False): "H", # unsigned short
(4, True): "l", # signed long
(4, False): "L", # unsigned long
(8, True): "q", # signed long long
(8, False): "Q", # unsigned long long
}

def get_struct(bytes, signed):
# Put together the formatting string for converting ints to bytes
if (bytes, signed) not in BYTE_FORMATS:
raise ValueError("invalid specification for int format: signed=%s, bytes=%s. signed must be bool, "
"bytes in [1, 2, 4, 8]" % (signed, bytes))
format_string = "<" + BYTE_FORMATS[(bytes, signed)]
# Compile the format for faster encoding
return struct.Struct(format_string)


class IndexedTermFeatureListCorpus(IterableCorpus):
"""
Term-feature instances, indexed by a dictionary, so that all that's stored is the indices of the terms
and features and the feature counts for each instance. This is iterable, but, unlike TermFeatureListCorpus,
doesn't iterate over documents. Now that we've filtered extracted features down to a smaller vocab, we
put everything in one big file, with one data point per line.
Since we're now storing indices, we can use a compact format that's fast to read from disk, making iterating
over the dataset faster than if we had to read strings, look them up in the vocab, etc.
By default, the ints are stored as C longs, which use 4 bytes. If you know you don't need ints this
big, you can choose 1 or 2 bytes, or even 8 (long long). By default, the ints are unsigned, but they
may be signed.
"""
def __init__(self, *args, **kwargs):
super(IndexedTermFeatureListCorpus, self).__init__(*args, **kwargs)
self._term_dictionary = None
self._feature_dictionary = None

@property
def term_dictionary(self):
if self._term_dictionary is None:
# Read in the dictionary from a file
with open(os.path.join(self.data_dir, "term_dictionary"), "r") as f:
self._term_dictionary = pickle.load(f)
return self._term_dictionary

@property
def feature_dictionary(self):
if self._feature_dictionary is None:
# Read in the dictionary from a file
with open(os.path.join(self.data_dir, "feature_dictionary"), "r") as f:
self._feature_dictionary = pickle.load(f)
return self._feature_dictionary

def __iter__(self):
# Check how many bytes to use per int
bytes = self.metadata.get("bytes", 4)
signed = self.metadata.get("signed", False)
# Prepare a struct for reading one int at a time
int_unpacker = get_struct(bytes, signed)
# Prepare another struct for reading the number of feature counts in a data point
length_unpacker = get_struct(2, False)

with open(os.path.join(self.data_dir, "data"), "r") as data_file:
try:
while True:
# Read data for a single int - the term
term = _read_binary_int(data_file, int_unpacker)
# Next digit tells us the number of feature counts in this data point
num_features = _read_binary_int(data_file, length_unpacker)
# Now read two ints for every feature: the feature id and the count
feature_counts = dict(
(_read_binary_int(data_file, int_unpacker), _read_binary_int(data_file, int_unpacker))
for i in range(num_features)
)
yield term, feature_counts
except StopIteration:
# Reached end of file
pass


class IndexedTermFeatureListCorpusWriter(IterableCorpusWriter):
"""
index_input=True means that the input terms and feature names are already mapped to dictionary indices, so
are assumed to be ints. Otherwise, inputs will be looked up in the appropriate dictionary to get an index.
"""
def __init__(self, base_dir, term_dictionary, feature_dictionary, bytes=4, signed=False, index_input=False):
super(IndexedTermFeatureListCorpusWriter, self).__init__(base_dir)
self.metadata["bytes"] = bytes
self.metadata["signed"] = signed
self.write_metadata()

self.feature_dictionary = feature_dictionary
self.term_dictionary = term_dictionary
# Write the dictionaries out to disk straight away
self.write_dictionaries()

self.int_packer = get_struct(self.metadata["bytes"], self.metadata["signed"])
self.length_packer = get_struct(2, False)
self.index_input = index_input

self._added_data_points = 0

def write_dictionaries(self):
with open(os.path.join(self.data_dir, "term_dictionary"), "w") as f:
pickle.dump(self.term_dictionary, f, -1)
with open(os.path.join(self.data_dir, "feature_dictionary"), "w") as f:
pickle.dump(self.feature_dictionary, f, -1)

def __enter__(self):
self.data_file = open(os.path.join(self.data_dir, "data"), "w")
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.data_file.close()
# Set the length to be the number of data points written
self.metadata["length"] = self._added_data_points
super(IndexedTermFeatureListCorpusWriter, self).__exit__(exc_type, exc_val, exc_tb)

def add_data_points(self, iterable):
for term, feature_counts in iterable:
feature_counts = feature_counts.items()
if not self.index_input:
# Look up indices in the dictionaries
if term not in self.term_dictionary.token2id:
# Skip unknown terms
continue
term = self.term_dictionary.token2id[term]
# Look up all feature name indices, skipping unknown features
feature_counts = [(self.feature_dictionary.token2id[f], cnt) for (f, cnt) in feature_counts
if f in self.feature_dictionary.token2id]
# Filter out zero counts
feature_counts = [(f, c) for (f, c) in feature_counts if c > 0]
if len(feature_counts) == 0:
# If there aren't any features left after filtering on dictionary and counts, skip whole data point
continue

# Now we've converted to indices, write the data out
# First write the term index
_write_binary_int(self.data_file, self.int_packer, term)
# Next the number of feature counts so we know how much to read when reading in
_write_binary_int(self.data_file, self.length_packer, len(feature_counts))
# Now all the features and their counts
for f, c in feature_counts:
_write_binary_int(self.data_file, self.int_packer, f)
_write_binary_int(self.data_file, self.int_packer, c)

# Keep a record of how much data we wrote so we can make the corpus length available to future modules
self._added_data_points += 1


def _read_binary_int(f, unpacker):
# Read the right number of bytes from the file
string = f.read(unpacker.size)
if string == "":
raise StopIteration
try:
# Try decoding this as a single int
integer = unpacker.unpack(string)
except struct.error, e:
raise DatatypeLoadError("could not unpack integer data in file: corrupt data? %s" % e)
return integer


def _write_binary_int(f, packer, x):
string = packer.pack(x)
f.write(string)


def escape_sep(sep, sep_type, text):
return text.replace(sep, "~~%s~~" % sep_type)

Expand Down
8 changes: 4 additions & 4 deletions src/python/pimlico/datatypes/tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import zlib
from itertools import izip

from pimlico.datatypes.base import IterableDocumentCorpusWriter, InvalidDocument
from .base import IterableDocumentCorpus
from pimlico.datatypes.base import IterableCorpusWriter, InvalidDocument
from .base import IterableCorpus


class TarredCorpus(IterableDocumentCorpus):
class TarredCorpus(IterableCorpus):
datatype_name = "tar"
# This may be overridden by subclasses to provide filters for documents applied before main doc processing
document_preprocessors = []
Expand Down Expand Up @@ -125,7 +125,7 @@ def data_ready(self):
return super(TarredCorpus, self).data_ready() and len(self.tar_filenames) > 0


class TarredCorpusWriter(IterableDocumentCorpusWriter):
class TarredCorpusWriter(IterableCorpusWriter):
"""
If gzip=True, each document is gzipped before adding it to the archive. Not the same as creating a tarball,
since the docs are gzipped *before* adding them, not the whole archive together, but it means we can easily
Expand Down
14 changes: 4 additions & 10 deletions src/python/pimlico/datatypes/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@
"""
import gzip
import os
import traceback
from multiprocessing import Pool, Queue
from multiprocessing import Pool

import time

from pimlico.core.modules.base import DependencyError
from pimlico.core.modules.execute import ModuleExecutionError
from pimlico.datatypes.base import IterableDocumentCorpus, PimlicoDatatypeWriter
from pimlico.datatypes.base import IterableCorpus, PimlicoDatatypeWriter
from pimlico.utils.progress import get_progress_bar


class XmlDocumentIterator(IterableDocumentCorpus):
class XmlDocumentIterator(IterableCorpus):
requires_data_preparation = True
input_module_options = [
("path", {
Expand All @@ -39,10 +36,7 @@ class XmlDocumentIterator(IterableDocumentCorpus):
]

def __iter__(self):
try:
from bs4 import BeautifulSoup
except ImportError:
raise DependencyError("BeautifulSoup could not be found. Have you run the make target in the lib dir?")
from bs4 import BeautifulSoup

if not os.path.isdir(self.path):
# Just a single file
Expand Down
8 changes: 4 additions & 4 deletions src/python/pimlico/modules/corpora/subset/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
from itertools import islice

from pimlico.core.modules.base import BaseModuleInfo
from pimlico.datatypes.base import IterableDocumentCorpus
from pimlico.datatypes.base import IterableCorpus


class CorpusSubsetFilter(IterableDocumentCorpus):
class CorpusSubsetFilter(IterableCorpus):
def __init__(self, pipeline, input_datatype, size, offset=0):
IterableDocumentCorpus.__init__(self, None, pipeline)
IterableCorpus.__init__(self, None, pipeline)

self.offset = offset
self.input_datatype = input_datatype
Expand All @@ -30,7 +30,7 @@ def data_ready(self):

class ModuleInfo(BaseModuleInfo):
module_type_name = "subset"
module_inputs = [("documents", IterableDocumentCorpus)]
module_inputs = [("documents", IterableCorpus)]
module_outputs = [("documents", CorpusSubsetFilter)]
module_options = [
("size", {
Expand Down
4 changes: 2 additions & 2 deletions src/python/pimlico/modules/corpora/tar/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
"""
from pimlico.core.modules.base import BaseModuleInfo
from pimlico.datatypes.base import IterableDocumentCorpus
from pimlico.datatypes.base import IterableCorpus
from pimlico.datatypes.tar import TarredCorpus


class ModuleInfo(BaseModuleInfo):
module_type_name = "tar"
module_inputs = [("documents", IterableDocumentCorpus)]
module_inputs = [("documents", IterableCorpus)]
module_outputs = [("documents", TarredCorpus)]
module_options = [
("archive_size", {
Expand Down
6 changes: 3 additions & 3 deletions src/python/pimlico/modules/corpora/tar_filter/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
import random
from pimlico.core.modules.base import BaseModuleInfo
from pimlico.core.modules.execute import ModuleNotReadyError
from pimlico.datatypes.base import IterableDocumentCorpus
from pimlico.datatypes.base import IterableCorpus
from pimlico.datatypes.tar import TarredCorpus


# Subclass TarredCorpus so that inputs expecting one can accept this
class TarredCorpusFilter(TarredCorpus):
def __init__(self, pipeline, input_datatype, archive_size, archive_basename="archive"):
IterableDocumentCorpus.__init__(self, None, pipeline)
IterableCorpus.__init__(self, None, pipeline)

self.archive_basename = archive_basename
self.input_datatype = input_datatype
Expand Down Expand Up @@ -96,7 +96,7 @@ def data_ready(self):

class ModuleInfo(BaseModuleInfo):
module_type_name = "tar_extract"
module_inputs = [("documents", IterableDocumentCorpus)]
module_inputs = [("documents", IterableCorpus)]
module_outputs = [("documents", TarredCorpusFilter)]
module_options = [
("archive_size", {
Expand Down
Empty file.
25 changes: 25 additions & 0 deletions src/python/pimlico/modules/features/vocab_mapper/exec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from pimlico.core.modules.base import BaseModuleExecutor
from pimlico.datatypes.base import InvalidDocument
from pimlico.datatypes.features import IndexedTermFeatureListCorpusWriter
from pimlico.utils.progress import get_progress_bar


class ModuleExecutor(BaseModuleExecutor):
def execute(self):
input_data = self.info.get_input("data")
self.log.info("Loading dictionaries")
term_vocab = self.info.get_input("term_vocab").get_data()
feature_vocab = self.info.get_input("feature_vocab").get_data()

pbar = get_progress_bar(len(input_data), title="Mapping")

# Prepare a writer for the output data
with IndexedTermFeatureListCorpusWriter(self.info.get_output_dir("data"), term_vocab, feature_vocab) as writer:
# Input is given for every document in a corpus
writer.add_data_points(
# Doc data consists of (term, feature count dict) pairs which we can pass straight to writer
(term, fcs)
for doc_name, document_data in pbar(input_data) if not isinstance(document_data, InvalidDocument)
for (term, fcs) in document_data
)
self.log.info("Mapper produced dataset with %d data points" % writer.metadata["length"])

0 comments on commit eae8b00

Please sign in to comment.