# Markov Chain

## Purpose

The purpose of this notbook is to investigate the Markov Chain algorithm, focused on use as a generative model.

## References

- Project Inspiration: [Nazarko](https://towardsdatascience.com/text-generation-gpt-2-lstm-markov-chain-9ea371820e1e), [Rizvi](https://medium.com/analytics-vidhya/a-comprehensive-guide-to-build-your-own-language-model-in-python-5141b3917d6d)
- Markov Chain Explanations: [Powell](https://setosa.io/ev/markov-chains/), [Soni](https://towardsdatascience.com/introduction-to-markov-chains-50da3645a50d), [Strika](https://towardsdatascience.com/markov-chains-how-to-train-text-generation-to-write-like-george-r-r-martin-cdc42786e4b6)
- Python Help: [Inheritance](https://realpython.com/python-interface/), checking [attributes](https://ioflood.com/blog/python-hasattr/), [generic typing](https://stackoverflow.com/questions/63056549/inheriting-from-a-generic-abstract-class-with-a-concrete-type-parameter-is-not-e) for init, unit [testing in jupyter](https://jupyter-tutorial.readthedocs.io/en/stable/notebook/testing/unittest.html) notebooks
- Matrices & Normalization:
    - [coo](https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.coo_matrix.html) & [csr](https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csr_matrix.html) matrices
    - sklearn [normalize](https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.normalize.html)
- Gotchas:
    - Do [not use](https://docs.python.org/3/library/abc.html#abc.abstractproperty) instance variables as validation for ABCs. User getter/setter properties.
- ChatGPT


In [3]:
# imports
import re
import unittest
import warnings
from abc import ABC, abstractmethod
from sys import getsizeof
from typing import Any, Dict, Generic, List, Tuple, TypeVar

import numpy as np
from scipy.sparse import coo_matrix, csr_matrix
from sklearn.preprocessing import normalize

T = TypeVar("T")

In [4]:
# Markov-related Tokenizer Interface. Intended for different data types. (e.g., arrays, strings, text, numbers, etc.)

class SequenceTokenizerInterface(ABC, Generic[T]):
    @classmethod
    def __subclasshook__(cls, subclass):
        return (hasattr(subclass, '__init__') and 
                hasattr(subclass, 'raw') and 
                hasattr(subclass, 'content') and 
                hasattr(subclass, 'tokens') and 
                hasattr(subclass, 'mapping') and 
                hasattr(subclass, 'tokenize') and 
                callable(subclass.tokenize) and
                hasattr(subclass, 'preprocess') and 
                callable(subclass.preprocess) and
                hasattr(subclass, 'total_tokens') and 
                callable(subclass.total_tokens) and
                hasattr(subclass, 'total_unique_tokens') and 
                callable(subclass.total_unique_tokens))

    @abstractmethod
    def __init__(self, content: T) -> None:
        """Constructor"""
        raise NotImplementedError

    @property
    @abstractmethod
    def raw(self) -> T:
        """The raw content"""
        raise NotImplementedError
    
    @property
    @abstractmethod
    def content(self) -> T:
        """The digested content from raw"""
        raise NotImplementedError
    
    @property
    @abstractmethod
    def tokens(self) -> List[T]:
        """The tokens from content"""
        raise NotImplementedError
    
    @property
    @abstractmethod
    def mapping(self) -> Dict[T, int]:
        """The index mappings for unique tokens"""
        raise NotImplementedError

    @abstractmethod
    def preprocess(self) -> T:
        """Preprocess the raw content object before tokenization"""
        raise NotImplementedError
    
    @abstractmethod
    def tokenize(self) -> Tuple[List[str], Dict[T, int]]:
        """Tokenize the content object"""
        raise NotImplementedError

    @abstractmethod
    def total_tokens(self) -> int:
        """Get total tokens generated during tokenization"""
        raise NotImplementedError

    @abstractmethod
    def total_unique_tokens(self) -> int:
        """Get total unique tokens generated during tokenization"""
        raise NotImplementedError


In [5]:
# Tokenizer for long text content (not individual characters in a str)

class NLPTextTokens(SequenceTokenizerInterface[str]):

    def __init__(self, content: str) -> None:
        if not isinstance(content, str):
            raise TypeError("content is not a string.")
        
        self._raw: str = content
        self._content: str = self.preprocess()
        self._tokens: List[str]
        self._mapping: Dict[str, int]
        self._tokens, self._mapping = self.tokenize(self._content)

    @property
    def raw(self) -> str:
        return self._raw

    @property
    def content(self) -> str:
        return self._content

    @property
    def tokens(self) -> List[str]:
        return self._tokens
    
    @property
    def mapping(self) -> Dict[str, int]:
        return self._mapping
    
    def preprocess(self) -> str:
        punctuation_pad = '!?.,:-;'
        punctuation_remove = '"()_\n'

        content_preprocess = re.sub(r'(\S)(\n)(\S)', r'\1 \2 \3', self._raw)
        content_preprocess = content_preprocess.translate(str.maketrans('', '', punctuation_remove))
        content_preprocess = content_preprocess.translate(
            str.maketrans({k: f' {k} ' for k in punctuation_pad}))
        content_preprocess = re.sub(' +', ' ', content_preprocess)
        content_preprocess = content_preprocess.strip()

        return content_preprocess

    @staticmethod
    def tokenize(content: List[str]) -> Tuple[List[str], Dict[str, int]]:
        content_list = content.split(' ')
        content_set = list(set(content_list))
        content_set.sort()
        content_set.append('<| unknown |>')
        content_dict = { v: i for i, v in enumerate(content_set) }

        return content_list, content_dict

    def total_tokens(self) -> int:
        return len(self._tokens) or 0

    def total_unique_tokens(self) -> int:
        return len(self._mapping.keys()) or 0

    def info(self) -> None:
        print(f'Total Size: {getsizeof(self.raw)}b\nTotal Tokens: {self.total_tokens()}\nTotal Distinct Tokens: {self.total_unique_tokens()}')

text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."
t = NLPTextTokens(text)
print(t.tokens)
print(t.mapping)
t.info()

['Lorem', 'ipsum', 'dolor', 'sit', 'amet', ',', 'consectetur', 'adipiscing', 'elit', ',', 'sed', 'do', 'eiusmod', 'tempor', 'incididunt', 'ut', 'labore', 'et', 'dolore', 'magna', 'aliqua', '.', 'Ut', 'enim', 'ad', 'minim', 'veniam', ',', 'quis', 'nostrud', 'exercitation', 'ullamco', 'laboris', 'nisi', 'ut', 'aliquip', 'ex', 'ea', 'commodo', 'consequat', '.', 'Duis', 'aute', 'irure', 'dolor', 'in', 'reprehenderit', 'in', 'voluptate', 'velit', 'esse', 'cillum', 'dolore', 'eu', 'fugiat', 'nulla', 'pariatur', '.', 'Excepteur', 'sint', 'occaecat', 'cupidatat', 'non', 'proident', ',', 'sunt', 'in', 'culpa', 'qui', 'officia', 'deserunt', 'mollit', 'anim', 'id', 'est', 'laborum', '.']
{',': 0, '.': 1, 'Duis': 2, 'Excepteur': 3, 'Lorem': 4, 'Ut': 5, 'ad': 6, 'adipiscing': 7, 'aliqua': 8, 'aliquip': 9, 'amet': 10, 'anim': 11, 'aute': 12, 'cillum': 13, 'commodo': 14, 'consectetur': 15, 'consequat': 16, 'culpa': 17, 'cupidatat': 18, 'deserunt': 19, 'do': 20, 'dolor': 21, 'dolore': 22, 'ea': 23, 'e

In [6]:
# Markov-related State Space

class StateSpaceInterface(ABC, Generic[T]):
    @classmethod
    def __subclasshook__(cls, subclass):
        return (hasattr(subclass, 'content') and
                hasattr(subclass, 'state_space') and
                hasattr(subclass, 'transition_matrix') and
                hasattr(subclass, 'transition_probability_matrix') and
                hasattr(subclass, 'generate_state_space') and 
                callable(subclass.generate_state_space) and
                hasattr(subclass, 'generate_transition_matrix') and 
                callable(subclass.generate_transition_matrix) and
                hasattr(subclass, 'generate_transition_matrix_prob') and 
                callable(subclass.generate_transition_matrix_prob))

    @abstractmethod
    def __init__(self, content: T, *args, **kwargs) -> None:
        """Constructor"""
        raise NotImplementedError

    @property
    @abstractmethod
    def content(self) -> T:
        """The content from which to create a state space"""
        raise NotImplementedError

    @property
    @abstractmethod
    def state_space(self) -> List[Any]:
        """The state space items from the content"""
        raise NotImplementedError

    @property
    @abstractmethod
    def transition_matrix(self) -> coo_matrix:
        """The transition matrix of the state space"""
        raise NotImplementedError

    @property
    @abstractmethod
    def transition_probability_matrix(self) -> csr_matrix:
        """The transition matrix of the state space"""
        raise NotImplementedError

    @abstractmethod
    def generate_state_space(self) -> Dict:
        """Generate the state space"""
        raise NotImplementedError

    @abstractmethod
    def generate_transition_matrix(self) -> coo_matrix:
        """Generate the transition matrix for every state"""
        raise NotImplementedError
        
    @abstractmethod
    def generate_transition_matrix_prob(self) -> csr_matrix:
        """Generate the transition matrix for every state"""
        raise NotImplementedError

In [7]:
# NLP Text StateSpace

class NLPTextStateSpace(StateSpaceInterface[NLPTextTokens]):

    @staticmethod
    def create_from_text(text: str, n: int = 2):
        try:
            text_tokens = NLPTextTokens(text)
            return NLPTextStateSpace(text_tokens, n)
        except Exception:
            raise ValueError("Exception returned while creating NLP Text Tokenizer")

    def __init__(self, content: NLPTextTokens, n: int = 2) -> None:
        if not isinstance(content, NLPTextTokens):
            raise TypeError("content is not and object of class NLPTextTokens.")
        if not (2 <= n <= 5):
            raise ValueError("n must be between 2 and 5, inclusive")

        self.n = n
        self._content = content
        self._n_grams = self.generate_ngrams()
        self._state_space = self.generate_state_space()
        self._transition_matrix = self.generate_transition_matrix()
        self._transition_matrix_prob = self.generate_transition_matrix_prob()

    @property
    def content(self) -> NLPTextTokens:
        return self._content
    
    @property
    def state_space(self) -> List[str]:
        return self._state_space
    
    @property
    def transition_matrix(self):
        return self._transition_matrix

    @property
    def transition_probability_matrix(self):
        return self._transition_matrix_prob

    @property
    def ngrams(self):
        return self._n_grams

    def generate_ngrams(self) -> List:
        sequences = [self._content.tokens[i:] for i in range(self.n)]

        return [' '.join(ngram) for ngram in list(zip(*sequences))]

    def generate_state_space(self) -> Dict[str, int]:
        if not self.ngrams:
            self.ngrams = self.generate_ngrams()

        n_grams_distinct = list(set(self.ngrams))
        n_grams_distinct.append("<| unknwon |>")
       
        return { v: i for i, v in enumerate(n_grams_distinct) }
    
    def generate_transition_matrix(self) -> coo_matrix:
        # create n-gram map if not already done so
        if not self.state_space:
            self.state_space = self.generate_state_space()

        # create coo matrix args
        row_ind, col_ind, values = ([] for i in range(3))
        for i in range(len(self._content.tokens[:-self.n])):
            ngram = ' '.join(self._content.tokens[i:i + self.n])
            ngram_ind = self._state_space[ngram]
            next_word_ind = self._content.mapping[self._content.tokens[i + self.n]]

            row_ind.extend([ngram_ind])
            col_ind.extend([next_word_ind])
            values.extend([1])

        # create and return coo matrix
        S = coo_matrix((values, (row_ind, col_ind)), shape=(len(self._state_space), len(self._content.mapping)))
        
        return S

    def generate_transition_matrix_prob(self) -> csr_matrix:
        transition_matrix = self.generate_transition_matrix()
        
        return normalize(transition_matrix, norm='l1', axis=1)

ss = NLPTextStateSpace(t, 3)
ss.content.info()
print('\n')
print(ss.content.tokens, '\n')
print([ss.content.mapping[t] for t in ss.content.tokens], '\n')
print(ss.content.mapping, '\n')
print(ss.state_space, '\n')
# print(ss.transition_matrix, '\n')
# print(ss.transition_probability_matrix, '\n')

Total Size: 494b
Total Tokens: 77
Total Distinct Tokens: 67


['Lorem', 'ipsum', 'dolor', 'sit', 'amet', ',', 'consectetur', 'adipiscing', 'elit', ',', 'sed', 'do', 'eiusmod', 'tempor', 'incididunt', 'ut', 'labore', 'et', 'dolore', 'magna', 'aliqua', '.', 'Ut', 'enim', 'ad', 'minim', 'veniam', ',', 'quis', 'nostrud', 'exercitation', 'ullamco', 'laboris', 'nisi', 'ut', 'aliquip', 'ex', 'ea', 'commodo', 'consequat', '.', 'Duis', 'aute', 'irure', 'dolor', 'in', 'reprehenderit', 'in', 'voluptate', 'velit', 'esse', 'cillum', 'dolore', 'eu', 'fugiat', 'nulla', 'pariatur', '.', 'Excepteur', 'sint', 'occaecat', 'cupidatat', 'non', 'proident', ',', 'sunt', 'in', 'culpa', 'qui', 'officia', 'deserunt', 'mollit', 'anim', 'id', 'est', 'laborum', '.'] 

[4, 37, 21, 58, 10, 0, 15, 7, 25, 0, 56, 20, 24, 60, 36, 62, 39, 29, 22, 42, 8, 1, 5, 26, 6, 43, 64, 0, 54, 47, 32, 61, 40, 45, 62, 9, 31, 23, 14, 16, 1, 2, 12, 38, 21, 35, 55, 35, 65, 63, 27, 13, 22, 30, 33, 48, 51, 1, 3, 57, 49, 18, 46, 52, 0, 59, 

In [8]:
# Markov interface to manage process

class MarkovChain(ABC, Generic[T]):
    
    @classmethod
    def __subclasshook__(cls, subclass):
        return (hasattr(subclass, '__init__') and
                hasattr(subclass, 'state_space') and
                callable(subclass.check_prefix) and
                hasattr(subclass, 'check_prefix') and
                callable(subclass.return_next_element) and
                hasattr(subclass, 'return_next_element') and
                callable(subclass.generate_sequence) and
                hasattr(subclass, 'generate_sequence'))

    @property
    @abstractmethod
    def state_space(self) -> T:
        """The content from which to create a state space"""
        raise NotImplementedError

    @abstractmethod
    def __init__(self, state_space: T, *args, **kwargs) -> None:
        """Constructor"""
        raise NotImplementedError

    @abstractmethod
    def check_prefix(self, prefix: Any) -> Any:
        """Checks a prefix against a state space"""
        raise NotImplementedError

    @abstractmethod
    def return_next_element(self, prefix: Any) -> Any:
        """Generates next element of sequence based on a prefix"""
        raise NotImplementedError

    @abstractmethod
    def generate_sequence(self, length: int, prefix: Any) -> Any:
        """Generates a sequence of specified length based on a prefix"""
        raise NotImplementedError


In [22]:
# Markov chain class to manage NLP Text Sequences

class NLPTextMarkovChain(MarkovChain[NLPTextStateSpace]):

    def __init__(self, state_space: NLPTextStateSpace):
        self._state_space = state_space
        self._reverse_word_mapping = { i: v for v, i in self._state_space.content.mapping.items() }

    @property
    def state_space(self) -> NLPTextStateSpace:
        return self._state_space

    def random_ngram(self) -> str:
        return np.random.choice(self.state_space.ngrams)
    
    def check_prefix(self, prefix: str) -> str:
        prefix_list = prefix.split(' ')[-self._state_space.n:]
        if len(prefix_list) < self._state_space.n:
            warnings.warn(
                f'Prefix is too short, please provide prefix of length: {self._state_space.n}. Random ngram used instead.')
            return self.random_ngram()
        else:
            prefix = ' '.join(prefix_list)
            if prefix in self.state_space.ngrams:
                return prefix
            else:
                warnings.warn(
                    'Prefix is not included in ngrams of the model. Provide another prefix. Random ngram used instead.')
                return self.random_ngram()

    def return_next_element(self, prefix: str) -> str:
        prefix = self.check_prefix(prefix)
        prefix_index = self.state_space.state_space[prefix]
        # TODO: add temperature to change weights here
        weights = self.state_space.transition_probability_matrix[prefix_index].toarray()[0]
        token_index = np.random.choice(range(len(weights)), p=weights)

        return self._reverse_word_mapping[token_index]

    def generate_sequence(self, length: int, prefix: str) -> str:
        prefix = self.check_prefix(prefix)
        print(prefix)
        sequence = prefix.split(' ')
        range_length = length - len(sequence)

        for i in range(range_length):
            next_word = self.return_next_element(prefix)
            sequence.append(next_word)
            prefix = ' '.join(sequence[-self._state_space.n:])

        return ' '.join(sequence)

text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."
ss = NLPTextStateSpace.create_from_text(text, 2)
chain = NLPTextMarkovChain(ss)

t = 'do eiusmod tempor'
print(chain.return_next_element(t))
print(chain.generate_sequence(16, t))


incididunt
eiusmod tempor
eiusmod tempor incididunt ut labore et dolore magna aliqua . Ut enim ad minim veniam ,


In [10]:
# Testing

# Tokenizer testing
class TestTokenizers(unittest.TestCase):

    def setUp(self):
        self.text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."

    def test_subclass(self):
        self.assertTrue(issubclass(NLPTextTokens, SequenceTokenizerInterface))
    
    def test_instantiation(self):
        # arrange, act, assert
        try:
            _ = NLPTextTokens(self.text)
        except Exception:
            self.fail("TextTokens() raised Exception unexpectedly!")

        with self.assertRaises(TypeError):
            NLPTextTokens(1234)
        
    def test_text_tokens(self):
        # arrange, act
        t = NLPTextTokens(self.text)

        # assert
        self.assertEqual(getsizeof(t.raw), 494)
        self.assertEqual(len(t.tokens), 77)
        self.assertEqual(len(t.mapping.keys()), 67)
        
# State Space testing
class TestStateSpaces(unittest.TestCase):

    def setUp(self):
        self.text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."
        self.tokens = NLPTextTokens(self.text)

    def test_instantiation(self):
        # arrange, act, assert
        try:
            _ = NLPTextStateSpace(self.tokens)
        except Exception:
            self.fail("TextTokens() raised Exception unexpectedly!")

        with self.assertRaises(TypeError):
            NLPTextStateSpace(1234)
        with self.assertRaises(ValueError):
            NLPTextStateSpace(self.tokens, 6)


unittest.main(argv=[''], verbosity=2, exit=False)

test_instantiation (__main__.TestStateSpaces) ... ok
test_instantiation (__main__.TestTokenizers) ... ok
test_subclass (__main__.TestTokenizers) ... ok
test_text_tokens (__main__.TestTokenizers) ... ok

----------------------------------------------------------------------
Ran 4 tests in 0.006s

OK


<unittest.main.TestProgram at 0x13efc78b0>