In [None]:
%load_ext autoreload
%autoreload 2
import unittest
import utility
import types
import typing

In [125]:
flatten = lambda l: [ x for ws in l for x in ws]
mock_corpus_data = [
    ('document_2013_1.txt', 2013, generate_document([(2,'a'), (1,'b'), (4,'c'), (1,'d')])),
    ('document_2013_2.txt', 2013, generate_document([(2,'a'), (2,'b'), (3,'c'), (0,'d')])),
    ('document_2014_1.txt', 2014, generate_document([(2,'a'), (3,'b'), (2,'c'), (0,'d')])),
    ('document_2014_2.txt', 2014, generate_document([(2,'a'), (4,'b'), (1,'c'), (1,'d')]))
]

class MockedProcessedCorpus():

    def __init__(self, data):
        self.token_documents = data
        self.vocabulary = self.create_vocabulary()
        self.n_tokens = { f: }
    def create_vocabulary(self):
        return { w: i for i, w in enumerate(sorted(list(set(flatten([ x[2] for x in self.token_documents]))))) }
        
    def documents(self):

        for filename, year, tokens in self.token_documents:
            yield types.SimpleNamespace(filename=filename, year=year), tokens

    def generate_document(self, words):
        document =  flatten([ n * w for n, w in words])
        return document

corpus = MockedProcessedCorpus(mock_corpus_data)
print(corpus.vocabulary)

for d in corpus.documents():
    print(d)

{'a': 0, 'b': 1, 'c': 2, 'd': 3}
(namespace(filename='document_2013_1.txt', year=2013), ['a', 'a', 'b', 'c', 'c', 'c', 'c', 'd'])
(namespace(filename='document_2013_2.txt', year=2013), ['a', 'a', 'b', 'b', 'c', 'c', 'c'])
(namespace(filename='document_2014_1.txt', year=2014), ['a', 'a', 'b', 'b', 'b', 'c', 'c'])
(namespace(filename='document_2014_2.txt', year=2014), ['a', 'a', 'b', 'b', 'b', 'b', 'c', 'd'])


In [38]:
import unittest
import utility

class test_TextFilesReader(unittest.TestCase):
    
    def test_archive_filenames_when_filter_txt_returns_txt_files(self):
        filename = '../data/test_corpus.zip'
        reader = utility.TextFilesReader(filename, pattern='*.txt')
        self.assertEqual(5, len(reader.archive_filenames))

    def test_archive_filenames_when_filter_md_returns_md_files(self):
        filename = '../data/test_corpus.zip'
        reader = utility.TextFilesReader(filename, pattern='*.md')
        self.assertEqual(1, len(reader.archive_filenames))

    def test_archive_filenames_when_filter_function_txt_returns_txt_files(self):
        filename = '../data/test_corpus.zip'
        itemfilter = lambda _, x: x.endswith('txt')
        reader = utility.TextFilesReader(filename, itemfilter=itemfilter)
        self.assertEqual(5, len(reader.archive_filenames))

    def test_get_file_when_default_returns_unmodified_content(self):
        filename = '../data/test_corpus.zip'
        document_name = 'dikt_2019_01_test.txt'
        reader = utility.TextFilesReader(filename, compress_whitespaces=False, dehyphen=True)
        result = next(reader.get_file(document_name))
        expected = "Tre svarta ekar ur snön.\r\n" + \
                   "Så grova, men fingerfärdiga.\r\n" + \
                   "Ur deras väldiga flaskor\r\n" + \
                   "ska grönskan skumma i vår."
        self.assertEqual(document_name, result[0])
        self.assertEqual(expected, result[1])
        
    def test_can_get_file_when_compress_whitespace_is_true_strips_whitespaces(self):
        filename = '../data/test_corpus.zip'
        document_name = 'dikt_2019_01_test.txt'
        reader = utility.TextFilesReader(filename, compress_whitespaces=True, dehyphen=True)
        result = next(reader.get_file(document_name))
        expected = "Tre svarta ekar ur snön. " + \
                   "Så grova, men fingerfärdiga. " + \
                   "Ur deras väldiga flaskor " + \
                   "ska grönskan skumma i vår."
        self.assertEqual(document_name, result[0])
        self.assertEqual(expected, result[1])

    def test_get_file_when_dehyphen_is_trye_removes_hyphens(self):
        filename = '../data/test_corpus.zip'
        document_name = 'dikt_2019_03_test.txt'
        reader = utility.TextFilesReader(filename, compress_whitespaces=True, dehyphen=True)
        result = next(reader.get_file(document_name))
        expected = "Nordlig storm. Det är den i den tid när rönnbärsklasar mognar. Vaken i mörkret hör man " + \
                   "stjärnbilderna stampa i sina spiltor " + \
                   "högt över trädet"
        self.assertEqual(document_name, result[0])
        self.assertEqual(expected, result[1])
        
    def test_get_file_when_file_exists_and_extractor_specified_returns_content_and_metadat(self):
        filename = '../data/test_corpus.zip'
        document_name = 'dikt_2019_03_test.txt'
        meta_extract = dict(year=r".{5}(\d{4})\_.*", serial_no=".{9}\_(\d+).*")
        reader = utility.TextFilesReader(filename, meta_extract=meta_extract, compress_whitespaces=True, dehyphen=True)
        result = next(reader.get_file(document_name))
        expected = "Nordlig storm. Det är den i den tid när rönnbärsklasar mognar. Vaken i mörkret hör man " + \
                   "stjärnbilderna stampa i sina spiltor " + \
                   "högt över trädet"
        self.assertEqual(document_name, result[0].filename)
        self.assertEqual(2019, result[0].year)
        self.assertEqual(3, result[0].serial_no)
        self.assertEqual(expected, result[1])
        
    def test_get_index_when_extractor_passed_returns_metadata(self):
        filename = '../data/test_corpus.zip'
        meta_extract = dict(year=r".{5}(\d{4})\_.*", serial_no=".{9}\_(\d+).*")
        reader = utility.TextFilesReader(filename, meta_extract=meta_extract, compress_whitespaces=True, dehyphen=True)
        result = reader.metadata
        expected = [
            types.SimpleNamespace(filename='dikt_2019_01_test.txt', serial_no=1, year=2019),
            types.SimpleNamespace(filename='dikt_2019_02_test.txt', serial_no=2, year=2019),
            types.SimpleNamespace(filename='dikt_2019_03_test.txt', serial_no=3, year=2019),
            types.SimpleNamespace(filename='dikt_2020_01_test.txt', serial_no=1, year=2020),
            types.SimpleNamespace(filename='dikt_2020_02_test.txt', serial_no=2, year=2020)]
        
        self.assertEqual(len(expected), len(result))
        for i in range(0,len(expected)):
            self.assertEqual(expected[i], result[i])

class test_Utilities(unittest.TestCase):
    
    def setUp(self):
        pass
 
    def test_dehypen(self):

        text = 'absdef\n'
        result = utility.dehyphen(text)
        self.assertEqual(text, result)

        text = 'abs-def\n'
        result = utility.dehyphen(text)
        self.assertEqual(text, result)
        
        text = 'abs - def\n'
        result = utility.dehyphen(text)
        self.assertEqual(text, result)
        
        text = 'abs-\ndef'
        result = utility.dehyphen(text)
        self.assertEqual('absdef\n', result)
        
        text = 'abs- \r\n def'
        result = utility.dehyphen(text)
        self.assertEqual('absdef\n', result)
    
    def test_compress_whitespaces(self):
        
        text = 'absdef\n'
        result = utility.compress_whitespaces(text)
        self.assertEqual('absdef', result)

        text = ' absdef \n'
        result = utility.compress_whitespaces(text)
        self.assertEqual( 'absdef', result)
        
        text = 'abs  def'
        result = utility.compress_whitespaces(text)
        self.assertEqual('abs def', result)
        
        text = 'abs\n def'
        result = utility.compress_whitespaces(text)
        self.assertEqual('abs def', result)
        
        text = 'abs- \r\n def'
        result = utility.compress_whitespaces(text)
        self.assertEqual('abs- def', result)
        
class Test_ExtractMeta(unittest.TestCase):
 
    def test_extract_metadata_when_valid_regexp_returns_metadata_values(self):
        filename = 'SOU 1957_5 Namn.txt'
        meta = utility.extract_metadata(filename, year=r".{4}(\d{4})\_.*", serial_no=".{8}\_(\d+).*")
        self.assertEqual(5, meta.serial_no)
        self.assertEqual(1957, meta.year)

    def test_extract_metadata_when_invalid_regexp_returns_none(self):
        filename = 'xyz.txt'
        meta = utility.extract_metadata(filename, value=r".{4}(\d{4})\_.*")
        self.assertEqual(None, meta.value)
        
unittest.main(argv=['first-arg-is-ignored'], exit=False)  

..............
----------------------------------------------------------------------
Ran 14 tests in 0.069s

OK


<unittest.main.TestProgram at 0x7f43c4cbc9b0>

In [34]:
import text_corpus
import nltk.tokenize

class Test_CorpusTextStream(unittest.TestCase):
 
    def test_next_document_when_new_corpus_returns_document(self):
        filename = '../data/test_corpus.zip'
        reader = utility.TextFilesReader(filename, compress_whitespaces=True, dehyphen=True)
        corpus = text_corpus.CorpusTextStream(reader)
        result = next(corpus.documents())
        expected = "Tre svarta ekar ur snön. " + \
                   "Så grova, men fingerfärdiga. " + \
                   "Ur deras väldiga flaskor " + \
                   "ska grönskan skumma i vår."
        self.assertEqual(expected, result[1])

    def test_get_index_when_extract_passed_returns_metadata(self):
        filename = '../data/test_corpus.zip'
        meta_extract = dict(year=r".{5}(\d{4})\_.*", serial_no=".{9}\_(\d+).*")
        reader = utility.TextFilesReader(filename, meta_extract=meta_extract, compress_whitespaces=True, dehyphen=True)
        corpus = text_corpus.CorpusTextStream(reader)
        result = corpus.get_index()
        expected = [
            types.SimpleNamespace(filename='dikt_2019_01_test.txt', serial_no=1, year=2019),
            types.SimpleNamespace(filename='dikt_2019_02_test.txt', serial_no=2, year=2019),
            types.SimpleNamespace(filename='dikt_2019_03_test.txt', serial_no=3, year=2019),
            types.SimpleNamespace(filename='dikt_2020_01_test.txt', serial_no=1, year=2020),
            types.SimpleNamespace(filename='dikt_2020_02_test.txt', serial_no=2, year=2020)
        ]
        self.assertEqual(len(expected), len(result))
        for i in range(0,len(expected)):
            self.assertEqual(expected[i], result[i])
            
    def test_get_index_when_no_extract_passed_returns_none(self):
        filename = '../data/test_corpus.zip'
        reader = utility.TextFilesReader(filename, meta_extract=None, compress_whitespaces=True, dehyphen=True)
        corpus = text_corpus.CorpusTextStream(reader)
        result = corpus.get_index()
        self.assertIsNone(result)
        
unittest.main(argv=['first-arg-is-ignored'], exit=False)
    

.............
----------------------------------------------------------------------
Ran 13 tests in 0.072s

OK


<unittest.main.TestProgram at 0x7f43c4d0d0f0>

In [105]:
class Test_CorpusTokenStream(unittest.TestCase):
    
    def create_reader(self, compress_whitespaces=True, dehyphen=True, meta_extract=None):
        filename = '../data/test_corpus.zip'
        #meta_extract = dict(year=r".{5}(\d{4})\_.*", serial_no=".{9}\_(\d+).*")
        reader = utility.TextFilesReader(filename, meta_extract=meta_extract, compress_whitespaces=compress_whitespaces, dehyphen=dehyphen)
        return reader
        
    def test_next_document_when_token_corpus_returns_tokenized_document(self):
        reader = reader = self.create_reader()
        corpus = text_corpus.CorpusTokenStream(reader, isalnum=False)
        _, tokens = next(corpus.documents())
        expected = ["Tre", "svarta", "ekar", "ur", "snön", ".",
                    "Så", "grova", ",", "men", "fingerfärdiga", ".",
                    "Ur", "deras", "väldiga", "flaskor",
                    "ska", "grönskan", "skumma", "i", "vår", "."]
        self.assertEqual(expected, tokens)
        
    def test_next_document_when_isalnum_true_skips_deliminators(self):
        reader = self.create_reader()
        corpus = text_corpus.CorpusTokenStream(reader, isalnum=True)
        _, tokens = next(corpus.documents())
        expected = ["Tre", "svarta", "ekar", "ur", "snön",
                    "Så", "grova", "men", "fingerfärdiga",
                    "Ur", "deras", "väldiga", "flaskor",
                    "ska", "grönskan", "skumma", "i", "vår"]
        self.assertEqual(expected, tokens)
        
    def test_get_index_when_extract_passed_returns_expected_count(self):
        reader = self.create_reader(meta_extract=dict(year=r".{5}(\d{4})\_.*", serial_no=".{9}\_(\d+).*"))
        corpus = text_corpus.CorpusTokenStream(reader)
        result = corpus.get_index()
        self.assertEqual(5, len(result))
        
    def test_n_tokens_when_exhausted_iterater_returns_expected_count(self):
        reader = self.create_reader()
        corpus = text_corpus.CorpusTokenStream(reader, isalnum=False)
        r_n_tokens = {}
        for filename, tokens in corpus.documents():
            r_n_tokens[filename] = len(tokens)
        n_tokens = corpus.n_tokens
        expected = {
            'dikt_2019_01_test.txt': 22,
            'dikt_2019_02_test.txt': 16,
            'dikt_2019_03_test.txt': 26,
            'dikt_2020_01_test.txt': 45,
            'dikt_2020_02_test.txt': 21
        }
        self.assertEqual(expected, n_tokens)
        self.assertEqual(expected, r_n_tokens)

    def test_n_tokens_when_exhausted_and_isalnum_is_true_returns_expected_count(self):
        reader = self.create_reader()
        corpus = text_corpus.CorpusTokenStream(reader, isalnum=True)
        r_n_tokens = {}
        for filename, tokens in corpus.documents():
            r_n_tokens[filename] = len(tokens)
        n_tokens = corpus.n_tokens
        expected = {
            'dikt_2019_01_test.txt': 18,
            'dikt_2019_02_test.txt': 14,
            'dikt_2019_03_test.txt': 24,
            'dikt_2020_01_test.txt': 42,
            'dikt_2020_02_test.txt': 18
        }
        self.assertEqual(expected, n_tokens)
        self.assertEqual(expected, r_n_tokens)
        
unittest.main(argv=['first-arg-is-ignored'], exit=False)

..........................
----------------------------------------------------------------------
Ran 26 tests in 0.231s

OK


<unittest.main.TestProgram at 0x7f43c3a86358>

In [104]:
class Test_ProcessedCorpus(unittest.TestCase):
    
    def setUp(self):
        pass
    
    def create_reader(self):
        filename = '../data/test_corpus.zip'
        meta_extract = dict(year=r".{5}(\d{4})\_.*", serial_no=".{9}\_(\d+).*")
        reader = utility.TextFilesReader(filename, meta_extract=meta_extract, compress_whitespaces=True, dehyphen=True)
        return reader
    
    def test_next_document_when_isalnum_is_true_returns_all_tokens(self):
        reader = self.create_reader()
        kwargs = dict(isalnum=False, to_lower=False, deacc=False, min_len=1, max_len=None, numerals=True)
        corpus = text_corpus.ProcessedCorpus(reader, **kwargs)
        _, tokens = next(corpus.documents())
        expected = ["Tre", "svarta", "ekar", "ur", "snön", ".",
                    "Så", "grova", ",", "men", "fingerfärdiga", ".",
                    "Ur", "deras", "väldiga", "flaskor",
                    "ska", "grönskan", "skumma", "i", "vår", "."]
        self.assertEqual(expected, tokens)
        
    def test_next_document_when_isalnum_true_skips_deliminators(self):
        reader = self.create_reader()
        kwargs = dict(isalnum=True, to_lower=False, deacc=False, min_len=1, max_len=None, numerals=True)
        corpus = text_corpus.ProcessedCorpus(reader, **kwargs)
        _, tokens = next(corpus.documents())
        expected = ["Tre", "svarta", "ekar", "ur", "snön",
                    "Så", "grova", "men", "fingerfärdiga",
                    "Ur", "deras", "väldiga", "flaskor",
                    "ska", "grönskan", "skumma", "i", "vår"]
        self.assertEqual(expected, tokens)
        
    def test_next_document_when_to_lower_is_true_returns_all_lowercase(self):
        reader = self.create_reader()
        kwargs = dict(isalnum=True, to_lower=True, deacc=False, min_len=1, max_len=None, numerals=True)
        corpus = text_corpus.ProcessedCorpus(reader, **kwargs)
        _, tokens = next(corpus.documents())
        expected = ["tre", "svarta", "ekar", "ur", "snön",
                    "så", "grova", "men", "fingerfärdiga",
                    "ur", "deras", "väldiga", "flaskor",
                    "ska", "grönskan", "skumma", "i", "vår"]
        self.assertEqual(expected, tokens)
        
    def test_next_document_when_min_len_is_two_returns_single_char_words_filtered_out(self):
        reader = self.create_reader()
        kwargs = dict(isalnum=True, to_lower=True, deacc=False, min_len=2, max_len=None, numerals=True)
        corpus = text_corpus.ProcessedCorpus(reader, **kwargs)
        _, tokens = next(corpus.documents())
        expected = ["tre", "svarta", "ekar", "ur", "snön",
                    "så", "grova", "men", "fingerfärdiga",
                    "ur", "deras", "väldiga", "flaskor",
                    "ska", "grönskan", "skumma", "vår"]
        self.assertEqual(expected, tokens)
        
    def test_next_document_when_max_len_is_six_returns_filter_out_longer_words(self):
        reader = self.create_reader()
        kwargs = dict(isalnum=True, to_lower=True, deacc=False, min_len=2, max_len=6, numerals=True)
        corpus = text_corpus.ProcessedCorpus(reader, **kwargs)
        _, tokens = next(corpus.documents())
        expected = ["tre", "svarta", "ekar", "ur", "snön",
                    "så", "grova", "men", 
                    "ur", "deras", 
                    "ska", "skumma", "vår"]
        self.assertEqual(expected, tokens)
        
    def test_get_index_when_extract_passed_returns_expected_count(self):
        reader = self.create_reader()
        kwargs = dict(isalnum=False, to_lower=False, deacc=False, min_len=2, max_len=None, numerals=True)
        corpus = text_corpus.ProcessedCorpus(reader, **kwargs)
        result = corpus.get_index()
        self.assertEqual(5, len(result))
        
    def test_n_tokens_when_exhausted_and_isalnum_min_len_two_returns_expected_count(self):
        reader = self.create_reader()
        corpus = text_corpus.ProcessedCorpus(reader, isalnum=True, min_len=2)
        r_tokens = {}
        for filename, tokens in corpus.documents():
            r_tokens[filename] = len(tokens)
        n_tokens = corpus.n_raw_tokens
        n_expected = {
            'dikt_2019_01_test.txt': 18,
            'dikt_2019_02_test.txt': 14,
            'dikt_2019_03_test.txt': 24,
            'dikt_2020_01_test.txt': 42,
            'dikt_2020_02_test.txt': 18
        }
        p_tokens = corpus.n_tokens
        p_expected = {
            'dikt_2019_01_test.txt': 17,
            'dikt_2019_02_test.txt': 13,
            'dikt_2019_03_test.txt': 21,
            'dikt_2020_01_test.txt': 42,
            'dikt_2020_02_test.txt': 18
        }
        self.assertEqual(n_expected, n_tokens)
        self.assertEqual(p_expected, p_tokens)
        self.assertEqual(p_expected, r_tokens)
        
unittest.main(argv=['first-arg-is-ignored'], exit=False)

..........................
----------------------------------------------------------------------
Ran 26 tests in 0.220s

OK


<unittest.main.TestProgram at 0x7f43c309ff60>