Skip to content

Commit

Permalink
Generate documents explicitly in README/testing example; update testi…
Browse files Browse the repository at this point in the history
…ng script.
  • Loading branch information
lapets committed Jun 9, 2023
1 parent 97d4cf0 commit 49ef03b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 53 deletions.
28 changes: 18 additions & 10 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,34 @@ In addition to the `use case in a related article <https://github.com/python-sup
Word-Document Index
~~~~~~~~~~~~~~~~~~~

Assume there exists a collection of documents and that each document contains a collection of 7-letter "words". This example demonstrates how a dictionary that associates each word to the collection of documents in which that word appears can be built by leveraging `multiprocessing <https://docs.python.org/3/library/multiprocessing.html>`__ and the `MapReduce <https://en.wikipedia.org/wiki/MapReduce>`__ paradigm. Suppose the function definitions below are found within a module ``example.py``:
Assume there exists a collection of documents and that each document contains a collection of 7-character "words". This example demonstrates how a dictionary that associates each word to the collection of documents in which that word appears can be built by leveraging `multiprocessing <https://docs.python.org/3/library/multiprocessing.html>`__ and the `MapReduce <https://en.wikipedia.org/wiki/MapReduce>`__ paradigm. Suppose the function definitions below are found within a module ``example.py``:

.. code-block:: python
from random import choice
from string import ascii_lowercase
from uuid import uuid4
def word(): # Generate a random 7-letter "word".
def word():
"""Generate a random 7-character 'word'."""
return ''.join(choice(ascii_lowercase) for _ in range(7))
def doc(): # Generate a random 100-word "document" and its identifier.
return ([word() for _ in range(100)], uuid4())
def doc():
"""Generate a random 25-word 'document' and its identifier."""
return ([word() for _ in range(25)], uuid4())
def word_to_doc_id_dict(doc): # Build dictionary mapping a document's words to its identifier.
(ws, identifier) = doc
return {w: {identifier} for w in ws}
def docs():
"""Generate list of 50 random 'documents'."""
return [doc() for _ in range(50)]
def merge_dicts(u, v): # Merge two dictionaries ``u`` and ``v``.
return {w: (u.get(w, set()) | v.get(w, set())) for w in u.keys() | v.keys()}
def word_to_doc_id_dict(document):
"""Build a dictionary mapping the 'words' in a 'document' to its identifier."""
(words, identifier) = document
return {w: {identifier} for w in words}
def merge_dicts(d, e):
"""Merge two dictionaries ``d`` and ``e``."""
return {w: (d.get(w, set()) | e.get(w, set())) for w in d.keys() | e.keys()}
.. |pool| replace:: ``pool``
.. _pool: https://mr4mp.readthedocs.io/en/2.7.0/_source/mr4mp.html#mr4mp.mr4mp.pool
Expand All @@ -82,7 +90,7 @@ The code below (also included in ``example.py``) constructs a dictionary that ma
start = default_timer()
p = mr4mp.pool()
p.mapreduce(word_to_doc_id_dict, merge_dicts, [doc() for _ in range(100)])
p.mapreduce(word_to_doc_id_dict, merge_dicts, docs())
p.close()
print(
"Finished in " + str(default_timer()-start) + "s " +
Expand Down
95 changes: 52 additions & 43 deletions test/test_mr4mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,34 @@ def test_module(self):
module = import_module('mr4mp.mr4mp')
self.assertTrue(api_methods().issubset(module.__dict__.keys()))

def word(identifier, k):
"""Create a random three-character word."""
def word(index_doc, index_word):
"""Generate a random (but reproducible) three-character 'word'."""
return ''.join(
ascii_lowercase[i % 7]
for i in sha256(bytes(identifier * k)).digest()[:3]
for i in sha256((index_doc * index_word).to_bytes(2, 'little')).digest()[:3]
)

def word_to_doc_id_dict(identifier):
"""Given a document identifier, map 25 random words to that identifier."""
return {w:{identifier} for w in {word(identifier, k) for k in range(25)}}
def doc(index_doc):
"""Generate a random (but reproducible) 25-word (or fewer) 'document' and its identifier."""
return (
[word(index_doc, index_word) for index_word in range(25)],
sha256(index_doc.to_bytes(2, 'little')).hexdigest()
)

def docs():
"""Generate list of 50 random (but reproducible) 'documents'."""
return [doc(index_doc) for index_doc in range(50)]

def word_to_doc_id_dict(document):
"""Build a dictionary mapping the 'words' in a 'document' to its identifier."""
(words, identifier) = document
return {w: {identifier} for w in set(words)}

def merge_dicts(u, v):
"""Merge two dictionaries."""
return {w: (u.get(w, set()) | v.get(w, set())) for w in u.keys() | v.keys()}
def merge_dicts(d, e):
"""Merge two dictionaries ``d`` and ``e``."""
return {w: (d.get(w, set()) | e.get(w, set())) for w in d.keys() | e.keys()}

result_reference = reduce(merge_dicts, map(word_to_doc_id_dict, range(50)))
result_reference = reduce(merge_dicts, map(word_to_doc_id_dict, docs()))

def add_one(x):
"""
Expand All @@ -66,7 +78,7 @@ def __call__(self, xs):
return self.logged

def to_list(self):
return list(sorted([x for xs in self.logged for x in xs]))
return [x for xs in self.logged for x in xs]

def define_class_pool_close(processes):
"""
Expand All @@ -83,56 +95,56 @@ def test_pool_mapreduce_pool_close(self):
self.assertFalse(pool.closed())
print("Starting.")
start = default_timer()
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50), close=False)
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs(), close=False)
self.assertFalse(pool.closed())
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50))
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs())
self.assertTrue(pool.closed())
print("Finished in " + str(default_timer()-start) +
"s using " + str(len(pool)) + " processes.")
self.assertEqual(result, result_reference)
self.assertDictEqual(result, result_reference)

def test_pool_mapreduce_function_close(self):
pool = mr4mp.pool(processes, close=False)
self.assertFalse(pool.closed())
print("Starting.")
start = default_timer()
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50), close=False)
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs(), close=False)
self.assertFalse(pool.closed())
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50), close=True)
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs(), close=True)
self.assertTrue(pool.closed())
print("Finished in " + str(default_timer()-start) +
"s using " + str(len(pool)) + " processes.")
self.assertEqual(result, result_reference)
self.assertDictEqual(result, result_reference)

def test_pool_mapreduce_pool_open_reuse(self):
pool = mr4mp.pool(processes, close=False)
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50))
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50))
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50))
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs())
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs())
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs())
self.assertFalse(pool.closed())
pool.close()
self.assertTrue(pool.closed())
self.assertEqual(result, result_reference)
self.assertDictEqual(result, result_reference)

def test_pool_mapreduce_pool_close_reuse_exception(self):
pool = mr4mp.pool(processes, close=True)
pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50))
pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs())
with self.assertRaises(ValueError):
pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50))
pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs())

def test_pool_mapreduce_function_close_reuse_exception(self):
pool = mr4mp.pool(processes, close=False)
pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50), close=True)
pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs(), close=True)
with self.assertRaises(ValueError):
pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50))
pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs())

def test_pool_mapreduce_many_with_as(self):
with mr4mp.pool(processes) as pool:
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50))
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50))
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50))
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs())
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs())
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs())
self.assertFalse(pool.closed())
self.assertEqual(result, result_reference)
self.assertDictEqual(result, result_reference)

return Test_pool_close

Expand All @@ -151,17 +163,14 @@ def test_pool_mapreduce(self):
logger = log() if progress else None
pool = mr4mp.pool(processes, close=True)
result = pool.mapreduce(
word_to_doc_id_dict,
merge_dicts,
range(50),
stages=stages,
progress=logger
word_to_doc_id_dict, merge_dicts, docs(),
stages=stages, progress=logger
)
self.assertEqual(result, result_reference)
self.assertDictEqual(result, result_reference)
if progress:
self.assertEqual(
logger.to_list(),
list(range(50)) if stages is not None else []
docs() if stages is not None else []
)

def test_pool_mapconcat(self):
Expand Down Expand Up @@ -190,14 +199,14 @@ class Test_functions(TestCase):
def test_mapreduce(self):
logger = log() if progress else None
result = mr4mp.mapreduce(
word_to_doc_id_dict, merge_dicts, range(50),
word_to_doc_id_dict, merge_dicts, docs(),
processes=processes, stages=stages, progress=logger
)
self.assertEqual(result, result_reference)
self.assertDictEqual(result, result_reference)
if progress:
self.assertEqual(
logger.to_list(),
list(range(50)) if stages is not None else []
docs() if stages is not None else []
)

def test_mapconcat(self):
Expand Down Expand Up @@ -235,19 +244,19 @@ def test_pool_mapreduce(self):
pool = mr4mp.pool(close=True)
print("Starting.")
start = default_timer()
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50))
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs())
print("Finished in " + str(default_timer()-start) +
"s using " + str(len(pool)) + " processes.")
self.assertEqual(result, result_reference)
self.assertDictEqual(result, result_reference)

def test_pool_mapreduce_terminate(self):
pool = mr4mp.pool()
print("Starting.")
start = default_timer()
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, range(50))
result = pool.mapreduce(word_to_doc_id_dict, merge_dicts, docs())
print("Finished in " + str(default_timer()-start) +
"s using " + str(len(pool)) + " processes.")
self.assertEqual(result, result_reference)
self.assertDictEqual(result, result_reference)
pool.terminate()
self.assertTrue(pool.closed())

Expand Down

0 comments on commit 49ef03b

Please sign in to comment.