In [1]:
import sys
if "../" not in sys.path: sys.path.insert(0,"../");

## Building for the data module

## Processors

##### Testing

In [4]:
from pathlib import Path
from typing import Union, List
import os

from marynlp.text.processors.formatters import lowercase, remove_punctuations, white_space_cleaning
from marynlp.text.funcutils import forEach, filterBy, yield_forEach, apply, calls, rules

data_path = Path("../resources/data")
helsinki_na_path = data_path / Path("./hcs-na-v2")

# File to test out the concept
sample_file = helsinki_na_path / Path("./new-mat/bunge/han1-2004.shu")

@forEach(lowercase)  # performs the reformatting for each line
@filterBy(shu_file_line_rule)
@filterBy(helsinki_shufile_line_rule)
def load_lines_from_file(file_path: Union[str, os.PathLike]) -> List[str]:
    file_path = Path(file_path)
    assert file_path.exists(), "The file doesn't exits"
    
    with open(file_path, mode='r', encoding="utf8") as rb:
        return rb.readlines()

@yield_forEach(lowercase)  # performs the reformatting for each line
@filterBy(rules(shu_file_line_rule, helsinki_shufile_line_rule))
def gen_lines_from_file(file_path: Union[str, os.PathLike]) -> List[str]:
    file_path = Path(file_path)
    assert file_path.exists(), "The file doesn't exits"
    
    with open(file_path, mode='r', encoding="utf8") as rb:
        for line in rb:
            yield line

@apply(calls(lowercase, remove_punctuations))
def load_text_from_file(file_path: Union[str, os.PathLike]) -> str:
    file_path = Path(file_path)
    assert file_path.exists(), "The file doesn't exits"
    
    with open(file_path, mode='r', encoding="utf8") as rb:
        return rb.read()

# load_lines_from_file(sample_file)
# load_text_from_file(sample_file)
    
# for i in gen_lines_from_file(sample_file):
#     print(i)

## Flow the data

In [6]:
import re
from collections.abc import Callable
from typing import Union

from marynlp.text.data.objects import mask_token, token
from marynlp.text.processors.formatters import white_space_cleaning

def replace_text_to_token(input_: Union[str, token, mask_token], is_text: Callable[str], m_token: mask_token) -> Union[token, mask_token]:    
    # Check if the input is test
    if isinstance(input_, str):
        if is_text(input_):
            return m_token

    return input_
# str_.upper()
# replace_number("I have 3,000,000 million dollars") # fails
# replace_number("The distance is 23.45 kilometers") # fails

In [7]:
# from collections.abc import Callable
from functools import partial
from marynlp.text.data.objects import sentence
from marynlp.text.funcutils import calls

# regex_for_numbers = r'(?<!\S)(?=.)(0|([1-9](\d*|\d{0,2}(,\d{3})*)))?(\.\d*[1-9])?(?!\S)|(\d+)'
regex_for_numbers = r'(\d+)'

common_flags =  re.UNICODE | re.MULTILINE | re.DOTALL
number_re_ = re.compile(regex_for_numbers, common_flags)

def is_number(input_: str): return number_re_.match(input_) is not None
def mask_number_in_text(text: str):
    text_matched = number_re_.split(text)
    tokens = map(partial(replace_text_to_token, is_text=is_number, m_token=Vocab.NUM_TOKEN), text_matched)
    return tokens

# tokens = mask_number_in_text("my name is kevin. I am 23 years old");tokens

In [8]:
from typing import Any, Union
from marynlp.text.data.objects import mask_token, word
from marynlp.text.funcutils import calls, forEach, filterBy

base_characters = 'abcdefghijklmnoprstuvwyz'
base_numbers = '0123456789'
base_word_non_letter_chars = '\'-'

r_sw_word = r'([{}{}{}{}]+)'.format(base_characters, base_characters.upper(), base_numbers, base_word_non_letter_chars)
word_re_ = re.compile(r_sw_word, common_flags)

def is_swahili_word(input_: str): return word_re_.match(input_) is not None
def is_mask_token(input_: Union[Any, mask_token]) -> bool: return isinstance(input_, mask_token)
def tokenize_swahili_word(input_: Union[str, token]):
    if is_swahili_word(str(input_)):
        return token(input_)
    
    return input_
    
# def valid_text_rule(text: str) -> bool:
#     return len(text.strip()) > 0

    
# @forEach(tokenize_swahili_word, skip_rule=is_mask_token)
# @forEach(calls(mask_number_in_text), skip_rule=is_mask_token)
@filterBy(lambda text: len(text.strip()) > 0)  # only work with text that aren't empty | similar to `valid_text_rule`
def word_tokenize(text: str):
    return word_re_.split(text)

word_tokenize("my name is kevin. I am 23 years old") # [t'my', t'name', t'is', t'kevin', '. I ', t'am', <num>, t'years', t'old']

<filter at 0x7fb3f451a7d0>

In [9]:
# @forEach(mask)


In [10]:
t = "Lorem ipsum dolor sit amet, consectetur adipiscing elit."
word_tokenize(t)

<filter at 0x7fb3f450ba90>

In [11]:
text_ = """
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur ac bibendum augue. Fusce at nisi tortor. Morbi non ligula eu arcu hendrerit viverra eget non elit. Duis blandit ut lorem sit amet vulputate. Sed dignissim justo erat, vel posuere eros gravida eu. Sed auctor interdum gravida. Maecenas imperdiet at ante in placerat. Quisque faucibus blandit cursus. Praesent aliquam tempor magna. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Aenean lacinia lobortis facilisis.
"""

break_text_to_sentences(text_.strip(), max_length=120)

['Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur ac bibendum augue. Fusce at nisi tortor. Morbi non l',
 'igula eu arcu hendrerit viverra eget non elit. Duis blandit ut lorem sit amet vulputate. Sed dignissim justo erat, vel p',
 'osuere eros gravida eu. Sed auctor interdum gravida. Maecenas imperdiet at ante in placerat. Quisque faucibus blandit cu',
 'rsus. Praesent aliquam tempor magna. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia cu',
 'rae; Aenean lacinia lobortis facilisis.']

## Figuring out tokenization

Identifying how to tokenize a sentence. Doing so such that:

Original sentnece:
```
mwanafunzi anaenda shule
```

*Word level masking:*
```
mwanafunzi anaenda [MASK]
```
`[MASK]` - Masking that happens here is for 2 token (subword): `shule` -> `shu`, `le`

*Subword level masking:*
```
mwanafunzu ana<MASK> shule
```

In [19]:
import os
from typing import List, Union, Iterable
from marynlp.text.data.objects import mask_token, token

# objects
# ------------------------------------------

class Vocab(object):
    """
    This should represent the different information about the Vocabulary
    """
    UNK_TOKEN = mask_token('unk')
    NUM_TOKEN = mask_token('num')
    
    def __init__(self, list_tokens: List[token]):
        self.tokens = list_tokens
    
    def has(self, token_: Union[str, token]) -> bool:
        """Checks if the vocab object has the token"""
        if isinstance(token_, str):
            token_ = token(token_)
            
        for tok in self.tokens:
            if tok.get() == token_.get():
                return True
            
        return False
    
    def get_tokens(self):
        return list(self.tokens)
    
    def __len__(self):
        return len(self.tokens)
    
    @classmethod
    def from_list(cls, list_str: List[str]):
        raise NotImplementedError()

    @classmethod
    def from_file(cls, file_path: Union[str, os.PathLike]):
        raise NotImplementedError()
    
    def extra_repr(self):
        # print the values
        if len(self) > 5:
            t = self.get_tokens()
            return "{}, ..., {}".format(", ".join(t[:2]), t[-1])
        
        return ", ".join(t)
    
    def __repr__(self):
        return 'Vocab(%s, count=%d)' % (self.extra_repr(), len(self))

In [20]:
from __future__ import annotations
from collections import defaultdict, OrderedDict
from typing import List, Tuple, Any, Union

class Mapper(object):
    def __init__(self, od: OrderedDict):
        self._od = od
        
    @property
    def ordered_dict():
        """Get the ordered dict"""
        return self._od
    
    def map_(self, key: str):
        if key not in self._od:
            raise KeyError("Mapping key '%s' doesn't exist in the Mapper" % key)

        return self._od[key]
    
    def add(self, key: str, value: Any):
        assert key not in self._od, "Mapping key '%s' already exists" % key
        self._od[key] = value
        pass
    
    @classmethod
    def from_list_tuple(cls, list_o_tuple: Union[zip, List[Tuple[str, Any]]]):
        return cls(OrderedDict(list_o_tuple))
    
    @classmethod
    def from_dict(cls, dict_: Dict[str, Any]):
        return cls(OrderedDict(dict_))
    
    def extra_repr(self):
        return list(self._od.items())
    
    def __repr__(self):
        return "Mapper({})".format(self.extra_repr())

class Encoder(Mapper):    
    @classmethod
    def from_decoder(cls, decoder: Decoder):
        return Encoder(decoder.ordered_dict)

class Decoder(Mapper):
    @classmethod
    def from_encoder(cls, encoder: Encoder):
        return Decoder(encoder.ordered_dict)
        pass

items = sorted(list(set('anaenda'))); items
mapper = Mapper.from_list_tuple(zip(items, range(len(items))))
mapper.add('r', 6)

mapper

Mapper([('a', 0), ('d', 1), ('e', 2), ('n', 3), ('r', 6)])

In [None]:
class Tokenizer():
    pass

## Building pipeline

In [2]:
# selectors: filters
# -----------------------

def shu_file_line_rule(text: str) -> bool:

    # Check if there is a text that has <text
    if text.find("<text") >= 0: return False

    # Check if there is a text that has </text>
    if text.find("</text>") >= 0: return False
    
    return True

# Making selection of data
def content_width_line_rule(text: str) -> bool:
    """
    Selectors to choose the lines that work for downstream processing
    """
    
    # if line is less than 20, done select for processing
    if len(text) < 20: return False

    return True

def break_text_to_sentences(text: str, max_length: int = 120):
    assert isinstance(text, str), "text should be string"
    _l = len(text)
    sentences = []
    
    ix, end = 0, 0
    for i in range(_l):
        if (i + 1) % max_length == 0:
            ix, end = end, i
            sentences.append(text[ix:end])

    # pass last sentence
    ix, end = end, _l
    sentences.append(text[ix:end])
    
    return sentences

In [3]:
from typing import Union, List
import os

from marynlp.text.processors.formatters import lowercase, remove_punctuations, white_space_cleaning
from marynlp.text import funcutils as f

@f.flowBy(break_text_to_sentences)
@f.forEach(lowercase)  # performs the reformatting for each line
@f.filterBy(f.rules(shu_file_line_rule, content_width_line_rule))
def load_lines_from_file(file_path: Union[str, os.PathLike]) -> List[str]:
    file_path = Path(file_path)
    assert file_path.exists(), "The file doesn't exits"
    
    with open(file_path, mode='r', encoding="utf8") as rb:
        return rb.readlines()

In [10]:
from pathlib import Path

data_path = Path("../resources/data")
helsinki_na_path = data_path / Path("./hcs-na-v2")

# File to test out the concept
sample_file = helsinki_na_path / Path("./new-mat/bunge/han1-2004.shu")

load_lines_from_file(sample_file)

<generator object load_lines_from_file at 0x7ffb74222450>