In [283]:
import random
import re
from collections import defaultdict
from pathlib import Path
import networkx as nx
from manim import *
from selma import BACKGROUND
from selma.graph import MGraph, gvlayout_factory, test_draw

config.background_color = BACKGROUND

In [284]:
class MarkovChainTextGenerator:
  def __init__(
    self, order=2, token_type='word', smoothing_alpha=0.001, interpolation=True
  ):
    """
    Markov Chain Text Generator with variable order and tokenization type.

    Parameters:
        order (int): Order of the Markov chain (n-gram size)
        token_type (str): 'word' for word-level, 'char' for character-level
        smoothing_alpha (float): Laplace smoothing factor
        interpolation (bool): Use backoff interpolation for unseen sequences
    """
    self.order = order
    self.token_type = token_type
    self.smoothing_alpha = smoothing_alpha
    self.interpolation = interpolation
    self.ngram_counts = defaultdict(lambda: defaultdict(int))
    self.total_counts = defaultdict(int)
    self.vocab = set()

  def tokenize(self, text):
    """Tokenizes text into words or characters based on user preference."""
    if self.token_type == 'word':
      return re.findall(r'\b\w+\b', text.lower())  # Simple word tokenization
    elif self.token_type == 'char':
      return list(text.lower())  # Character-level tokenization
    else:
      raise ValueError("token_type must be 'word' or 'char'")

  def train(self, corpus):
    """Trains the Markov chain model on a given corpus."""
    tokens = self.tokenize(corpus)
    self.vocab.update(tokens)
    self.vocab.add('<UNK>')  # Handle unknown words

    for i in range(len(tokens) - self.order):
      prefix = tuple(tokens[i : i + self.order])  # n-gram prefix
      next_token = tokens[i + self.order]  # next token

      self.ngram_counts[prefix][next_token] += 1
      self.total_counts[prefix] += 1

  def get_next_token_probs(self, prefix):
    """
    Retrieves transition probabilities using interpolation.
    """
    prefix = tuple(prefix)
    if prefix in self.ngram_counts:
      total = self.total_counts[prefix] + self.smoothing_alpha * len(self.vocab)
      return {
        token: (count + self.smoothing_alpha) / total
        for token, count in self.ngram_counts[prefix].items()
      }

    # Backoff: Try shorter prefixes
    if self.interpolation:
      for k in range(self.order - 1, 0, -1):
        shorter_prefix = prefix[-k:]
        if shorter_prefix in self.ngram_counts:
          total = self.total_counts[shorter_prefix] + self.smoothing_alpha * len(
            self.vocab
          )
          return {
            token: (count + self.smoothing_alpha) / total
            for token, count in self.ngram_counts[shorter_prefix].items()
          }

    # If no known transitions, return uniform probabilities over vocab
    return {token: 1 / len(self.vocab) for token in self.vocab}

  def generate(self, seed=None, length=50):
    """
    Generates text based on the trained model.
    """
    if not self.ngram_counts:
      raise ValueError('Model is not trained. Call `train(corpus)` first.')

    tokens = (
      list(self.tokenize(seed))
      if seed
      else [random.choice(list(self.vocab - {'<UNK>'}))]
    )

    while len(tokens) < length:
      prefix = tuple(tokens[-self.order :])
      probs = self.get_next_token_probs(prefix)

      next_token = random.choices(list(probs.keys()), weights=list(probs.values()))[0]
      tokens.append(next_token)

    return ' '.join(tokens) if self.token_type == 'word' else ''.join(tokens)

In [285]:
corpora = {
  'mc': """
Markov chains are mathematical systems that undergo transitions from one state to another 
on a state space. A sequence of possible events in which the probability of each event 
depends only on the state attained in the previous event is called a Markov chain.
""",
  'ttls': """
Twinkle, twinkle, little star,
How I wonder what you are!
Up above the world so high,
Like a diamond in the sky.
""",
  'gadda': 'Come poco pepe a coppia cuoce e scoppia',  # Gadda
  'giovanni': 'In principio era il Verbo, e il Verbo era presso Dio, e il Verbo era Dio',  # dal Vangelo di Giovanni, 1:1
  'commedia': Path('../../data/commedia.txt').read_text(),
  'promessi': Path('../../data/papini.txt').read_text()
}

In [286]:
def clean_text(corpus):
  res = [' ']
  for c in corpus:
    l = c.lower()
    if l.isalpha():
      res.append(l)
    elif l == ' ' and res[-1] != ' ':
      res.append(' ')
  return ''.join(res[1:])

In [331]:
CORPUS = 'giovanni'

ORDER = 1
generator = MarkovChainTextGenerator(
  order=ORDER, token_type='word', smoothing_alpha=0, interpolation=False
)
generator.train(clean_text(corpora[CORPUS]))

In [332]:
generator.generate('dio', length=100)

'dio e il verbo e il verbo e il verbo era dio e il verbo era dio e il verbo era il verbo era presso dio e il verbo era il verbo e il verbo era dio e il verbo era presso dio e il verbo era il verbo era presso dio e il verbo e il verbo era presso dio e il verbo e il verbo e il verbo era presso dio e il verbo era presso dio e il verbo era presso dio e il verbo era presso dio e il verbo era dio e il verbo era'

In [None]:
def build_markov_chain(scene, corpus, by_char, layout, node_scale):
  
  if by_char:
    data = [c for c in clean_text(corpus) if c != ' ']
  else:
    data = clean_text(corpus).split()

  seq = list(zip(data, data[1:]))
  G = nx.DiGraph(set(seq))
  weight = {e: 0 for e in G.edges()}
  G.remove_edges_from(nx.selfloop_edges(G))
  
  MG = MGraph(G, layout=layout, node_scale=node_scale)
  
  tt = [t for d in data for t in (d, ' ')]
  T = Tex(*tt, color=BLACK).scale(.9)
  T.to_edge(UL)
  scene.add(T)
  
  def highlight(t):
    mt = MG.mnode(t)
    mt.z_index = 1
    mt.set_stroke(color=PURE_GREEN)
    T[highlight.step].set_color(PURE_GREEN)
    scene.add(mt)
    scene.wait(.5)
    mt.set_stroke(color=DARK_BROWN)
    T[highlight.step].set_color(BLACK)
    highlight.step += 2
  highlight.step = 0

  highlight(seq[0][0])
  for s, t in seq:
    weight[(s, t)] += 1
    if s != t:
      me = MG.medge(s, t)
      me.z_index = 0
      me.set_stroke(width=weight[(s, t)] * 2, color=PURE_GREEN) 
      scene.add(me)
    else:
      me = None
    highlight(t)
    if me: me.set_stroke(color=BLACK) 

In [None]:
%%manim --hide-splash -qm -v WARNING MarkovByChar

layout = 

class MarkovByChar(Scene):
  def construct(self):
    build_markov_chain(self, corpora['gadda'], by_char=True, layout = gvlayout_factory('dot', heightscale=.5))


In [None]:
%%manim --hide-splash -qm -v WARNING MarkovByWord

layout = gvlayout_factory('dot', heightscale=.5)

class MarkovByWord(Scene):
  def construct(self):
    build_markov_chain(self, corpora['giovanni'], by_char=False, layout = gvlayout_factory('dot', heightscale=.5))
