Skip to content

Commit

Permalink
Added matrix building module to build feature matrices from extracted…
Browse files Browse the repository at this point in the history
… features.
  • Loading branch information
Mark Granroth-Wilding committed Apr 6, 2016
1 parent eae8b00 commit 7fe67ed
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 10 deletions.
9 changes: 5 additions & 4 deletions src/python/pimlico/core/modules/map/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ def execute(self):
invalid_inputs = 0
invalid_outputs = 0

docs_completed, start_after = self.retrieve_processing_status()
docs_completed_now = 0
docs_completed_before, start_after = self.retrieve_processing_status()

pbar = get_progress_bar(len(self.input_iterator) - docs_completed,
pbar = get_progress_bar(len(self.input_iterator) - docs_completed_before,
title="%s map" % self.info.module_type_name.replace("_", " ").capitalize())

try:
Expand Down Expand Up @@ -134,8 +135,8 @@ def execute(self):
writer.add_document(archive, filename, result)

# Update the module's metadata to say that we've completed this document
docs_completed += 1
self.update_processing_status(docs_completed, archive, filename)
docs_completed_now += 1
self.update_processing_status(docs_completed_before+docs_completed_now, archive, filename)
complete = True
self.log.info("Input contained %d invalid documents, output contained %d" %
(invalid_inputs, invalid_outputs))
Expand Down
12 changes: 7 additions & 5 deletions src/python/pimlico/core/parallel/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ def execute(self):
docs_processing = []
result_buffer = {}

docs_completed, start_after = self.retrieve_processing_status()
docs_completed_now = 0
docs_completed_before, start_after = self.retrieve_processing_status()

pbar = get_progress_bar(len(self.input_iterator) - docs_completed,
self.log.info("Processing %d documents" % (len(self.input_iterator) - docs_completed_before))
pbar = get_progress_bar(len(self.input_iterator) - docs_completed_before,
title="%s map" % self.info.module_type_name.replace("_", " ").capitalize())
try:
# Prepare a corpus writer for the output
Expand Down Expand Up @@ -150,9 +152,9 @@ def execute(self):
writer.add_document(archive, filename, result)

# Update the module's metadata to say that we've completed this document
docs_completed += 1
pbar.update(docs_completed)
self.update_processing_status(docs_completed, archive, filename)
docs_completed_now += 1
pbar.update(docs_completed_now)
self.update_processing_status(docs_completed_before+docs_completed_now, archive, filename)

pbar.finish()
complete = True
Expand Down
86 changes: 86 additions & 0 deletions src/python/pimlico/datatypes/arrays.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""
Wrappers around Numpy arrays and Scipy sparse matrices.
"""
import os

from pimlico.datatypes.base import PimlicoDatatype, PimlicoDatatypeWriter


class NumpyArray(PimlicoDatatype):
def __init__(self, base_dir, pipeline, **kwargs):
super(NumpyArray, self).__init__(base_dir, pipeline, **kwargs)
self._array = None

@property
def array(self):
if self._array is None:
import numpy
with open(os.path.join(self.data_dir, "array.npy"), "r") as f:
self._array = numpy.load(f)
return self._array

def data_ready(self):
return super(NumpyArray, self).data_ready() and os.path.exists(os.path.join(self.data_dir, "array.npy"))

def check_runtime_dependencies(self):
missing_dependencies = []
try:
import numpy
except ImportError:
missing_dependencies.append(("Numpy", "install Numpy systemwide (e.g. package 'python-numpy' on Ubuntu)"))
missing_dependencies.extend(super(NumpyArray, self).check_runtime_dependencies())
return missing_dependencies


class NumpyArrayWriter(PimlicoDatatypeWriter):
def set_array(self, array):
import numpy
numpy.save(os.path.join(self.data_dir, "array.npy"), array)


class ScipySparseMatrix(PimlicoDatatype):
"""
Wrapper around Scipy sparse matrices. The matrix loaded is always in COO format -- you probably want to convert
to something else before using it. See scipy docs on sparse matrix conversions.
"""
def __init__(self, base_dir, pipeline, **kwargs):
super(ScipySparseMatrix, self).__init__(base_dir, pipeline, **kwargs)
self._array = None

@property
def array(self):
if self._array is None:
from scipy import io
self._array = io.mmread(os.path.join(self.data_dir, "array.mtx"))
return self._array

def data_ready(self):
return super(ScipySparseMatrix, self).data_ready() and os.path.exists(os.path.join(self.data_dir, "array.mtx"))

def check_runtime_dependencies(self):
missing_dependencies = []
try:
import numpy
except ImportError:
missing_dependencies.append(("Numpy", "install Numpy systemwide (e.g. package 'python-numpy' on Ubuntu)"))
try:
import scipy
except ImportError:
missing_dependencies.append(("Scipy", "install Scipy systemwide (e.g. package 'python-scipy' on Ubuntu)"))
missing_dependencies.extend(super(ScipySparseMatrix, self).check_runtime_dependencies())
return missing_dependencies


class ScipySparseMatrixWriter(PimlicoDatatypeWriter):
def set_matrix(self, mat):
from scipy.sparse import coo_matrix
from scipy.io import mmwrite

if type(mat) is not coo_matrix:
# If this isn't a COO matrix, try converting it
# Other scipy sparse matrix types and numpy dense arrays can all be converted in this way
mat = coo_matrix(mat)

mmwrite(os.path.join(self.data_dir, "array.mtx"), mat)
2 changes: 1 addition & 1 deletion src/python/pimlico/datatypes/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def _read_binary_int(f, unpacker):
raise StopIteration
try:
# Try decoding this as a single int
integer = unpacker.unpack(string)
integer = unpacker.unpack(string)[0]
except struct.error, e:
raise DatatypeLoadError("could not unpack integer data in file: corrupt data? %s" % e)
return integer
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import numpy
from scipy.sparse.dok import dok_matrix

from pimlico.core.modules.base import BaseModuleExecutor
from pimlico.datatypes.arrays import ScipySparseMatrixWriter
from pimlico.utils.progress import get_progress_bar


class ModuleExecutor(BaseModuleExecutor):
def execute(self):
input_data = self.info.get_input("data")

self.log.info("Collecting features into a %d x %d sparse matrix from %d data points" %
(len(input_data.term_dictionary), len(input_data.feature_dictionary), len(input_data)))
pbar = get_progress_bar(len(input_data), title="Collecting")

matrix = dok_matrix((len(input_data.term_dictionary), len(input_data.feature_dictionary)), dtype=numpy.int32)
# Iterate over the input data and collect up counts from all instances of each term
for term, feature_counts in pbar(input_data):
for feature, count in feature_counts.items():
matrix[term, feature] += count

# Write out the matrix
self.log.info("Built matrix: writing to disk")
with ScipySparseMatrixWriter(self.info.get_output_dir("matrix")) as writer:
# Matrix will be converted to COO format before writing
writer.set_matrix(matrix)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from pimlico.core.modules.base import BaseModuleInfo
from pimlico.datatypes.arrays import ScipySparseMatrix
from pimlico.datatypes.features import IndexedTermFeatureListCorpus


class ModuleInfo(BaseModuleInfo):
module_type_name = "term_feature_matrix_builder"
module_inputs = [
("data", IndexedTermFeatureListCorpus)
]
module_outputs = [("matrix", ScipySparseMatrix)]
module_options = {}

def check_runtime_dependencies(self):
missing_dependencies = []
try:
import numpy
except ImportError:
missing_dependencies.append(("Numpy", self.module_name,
"install Numpy systemwide (e.g. package 'python-numpy' on Ubuntu)"))
try:
import scipy
except ImportError:
missing_dependencies.append(("Scipy", self.module_name,
"install Scipy systemwide (e.g. package 'python-scipy' on Ubuntu)"))

missing_dependencies.extend(super(ModuleInfo, self).check_runtime_dependencies())
return missing_dependencies

0 comments on commit 7fe67ed

Please sign in to comment.