# Markov

**Algoritmo para criação de uma HMM**
1. Defina o espaço de estados e o espaço de observações
2. Defina a distribuição do estado inicial
3. Defina a probabilidade das transições de estado
4. Defina a probabilidade de uma observação dada um estado
5. Crie o modelo
6. Dada uma observação, retorne a sequência mais provável de estados
escondidos
7. Avalie o modelo

## Contexto

- Observações: Palavras do vocabulário das letras.
- Estados Ocultos: Classes Gramaticais (POS tags).
- π (distribuição inicial): Probabilidade de cada classe gramatical iniciar um verso/frase.
- A (matriz de transição): Probabilidade de uma classe gramatical seguir outra.
- B (matriz de emissão): Probabilidade de uma palavra específica ser gerada por uma determinada classe gramatical.

## Leitura dos Dados

A função `get_text_list_from_files` recebe o nome da pasta e retorna os conteúdos dos arquivos como uma lista de strings.

In [1]:
import os
import glob


def get_text_list_from_files( folder_name: str ) -> list[ str ]:
    """
    Lê o conteúdo de múltiplos arquivos e retorna uma lista de strings,
    onde cada string representa o conteúdo de um arquivo.

    Args:
        folder_name (str): O nome da pasta onde os arquivos de texto estão localizados.
                           Espera-se que os arquivos estejam dentro de um subdiretório chamado 'musicas'.

    Returns:
        list[str]: Uma lista onde cada elemento é o conteúdo dos arquivos lidos.
    """

    # Utiliza a biblioteca glob para encontrar todos os arquivos com extensão .txt
    # dentro do subdiretório especificado dentro de 'musicas'.
    files = glob.glob( f"musicas/{folder_name}/*.txt" )

    text_list: list[ str ] = [ ]
    for file_path in files:
        with open( file_path, "r", encoding = "utf-8" ) as file:
            # Lê todo o conteúdo do arquivo de uma vez.
            # Se o arquivo contiver várias linhas e você precisar de cada linha como um item separado na lista,
            # você pode iterar sobre o objeto 'file' (por exemplo, `for line in file: text_list.append(line.strip())`).
            text_list.append( file.read() )
    return text_list


# Cria o DataSet de treino chamando a função com o nome da pasta 'train'.
train_ds = get_text_list_from_files( "train" )

# Cria o DataFrame de teste/validação chamando a função com o nome da pasta 'test'.
test_ds = get_text_list_from_files( "test" )

# Imprime o tamanho (número de linhas) do DataFrame de treino.
print( f"Tamanho do DataFrame de Treino: {len( train_ds )}" )

# Imprime o tamanho (número de linhas) do DataFrame de teste/validação.
print( f"Tamanho do DataFrame de Teste/Validação: {len( test_ds )}" )

Tamanho do DataFrame de Treino: 1444
Tamanho do DataFrame de Teste/Validação: 361


## Pré-processamento: Tagging Gramatical (POS Tagging)

Nesta seção, preparamos as ferramentas e definimos a função principal para realizar o Part-of-Speech (POS) Tagging nos nossos textos. O POS tagging é uma etapa fundamental em muitos pipelines de Processamento de Linguagem Natural (PLN), onde atribuímos a cada palavra sua classe gramatical (verbo, substantivo, adjetivo, etc.).

Utilizaremos a biblioteca `spaCy` com seu modelo pré-treinado para o português (`pt_core_news_lg`), que oferece uma boa performance e detalhes nas tags. O código a seguir carrega o modelo (fazendo o download se necessário) e define a função `tag_sequences` que processará listas de textos.

In [2]:
import spacy

# Carregando o modelo português do spaCy
try:
    nlp = spacy.load( "pt_core_news_lg" )
except OSError:
    print( "Modelo 'pt_core_news_lg' não encontrado. Baixando..." )
    os.system( "python -m spacy download pt_core_news_lg" )
    nlp = spacy.load( "pt_core_news_lg" )


def tag_sequences( text_list: list[ str ] ) -> list[ list[ tuple[ str, str ] ] ]:
    """
    Aplica POS tagging a uma lista de textos (frases/versos).

    Args:
        text_list (list[str]): Lista de strings (frases ou versos).

    Returns:
        list[list[tuple[str, str]]]: Uma lista de sequências taggeadas.
                                     Cada sequência é uma lista de tuplas (palavra, tag).
    """
    tagged_sequences = [ ]
    # Processa os textos em lote para eficiência
    # A limpeza agressiva de pontuação deve vir DEPOIS ou ser feita pelo tagger
    docs = nlp.pipe( text_list )  # Processa a lista de textos

    for doc in docs:
        sequence = [ ]
        for token in doc:
            # token.text: a palavra/token original
            # token.pos_: a classe gramatical Universal Dependencies (mais simples, ex: NOUN, VERB, ADJ)
            if not token.is_punct and not token.is_space:  # Opcional: ignorar pontuação e espaços aqui
                sequence.append( (token.text.lower(), token.pos_) )  # Armazena palavra (minúscula) e tag
        if sequence:  # Adiciona apenas se a sequência não estiver vazia
            tagged_sequences.append( sequence )
    return tagged_sequences


In [3]:
tagged_train_sequences = tag_sequences( train_ds )
tagged_test_sequences = tag_sequences( test_ds )

## Preparação para Modelagem: Vocabulário e Estados

Com as sequências devidamente taggeadas (armazenadas em `tagged_train_sequences`), o próximo passo é preparar esses dados para serem utilizados em modelos que requerem representação numérica. Para isso, precisamos:

1.  **Identificar o Vocabulário:** Extrair todas as palavras únicas presentes no corpus de treino.
2.  **Identificar os Estados:** Extrair todas as tags gramaticais (POS tags) únicas, que representarão os "estados" em modelos como HMMs.
3.  **Criar Mapeamentos para Índices:** Construir dicionários que associam cada palavra única e cada tag única a um índice numérico inteiro. Isso é essencial para criar matrizes de probabilidade ou vetores de entrada para redes neurais.

O código a seguir realiza essas tarefas: percorre as sequências taggeadas, utiliza `set` para coletar itens únicos, ordena as listas resultantes para garantir consistência (`vocabulary`, `states`), e cria os dicionários de mapeamento (`word_to_index`, `tag_to_index`). Por fim, exibe o tamanho do vocabulário e o número de estados encontrados.

In [4]:
import numpy as np

all_words = set()
all_tags = set()

for sequence in tagged_train_sequences:
    for word, tag in sequence:
        all_words.add( word )  # Adiciona palavra ao set
        all_tags.add( tag )  # Adiciona tag ao set

# Ordenar para ter índices consistentes
vocabulary = sorted( list( all_words ) )
states = sorted( list( all_tags ) )

# Criar mapeamentos para índices (essencial para matrizes numpy)
word_to_index = { word: i for i, word in enumerate( vocabulary ) }
tag_to_index = { tag: i for i, tag in enumerate( states ) }

n_vocab = len( vocabulary )
n_states = len( states )

print( f"Tamanho do vocabulário: {n_vocab}" )
print( f"Número de estados (tags) únicos: {n_states}" )

# print("Estados:", states)
# print("Vocabulário (primeiros 50):", vocabulary[:50])

Tamanho do vocabulário: 11162
Número de estados (tags) únicos: 16


## Cálculo do Vetor π (Probabilidades Iniciais)

Este bloco de código calcula o vetor de probabilidades iniciais para um modelo de sequência, onde cada elemento de `pi` representa a chance de uma tag ser a primeira em uma sequência.

- **Contagem:**
  Conta quantas vezes cada tag aparece na posição inicial de cada sequência usando `Counter`.

- **Inicialização:**
  Cria um vetor `pi` com zeros e atribui as contagens correspondentes às tags usando um mapeamento (`tag_to_index`).

- **Normalização:**
  Converte as contagens em probabilidades dividindo pelo número total de sequências. Caso não haja sequências, utiliza uma distribuição uniforme.


In [5]:
from collections import Counter

initial_tag_counts = Counter()
total_sequences = len( tagged_train_sequences )

for sequence in tagged_train_sequences:
    if sequence:  # Verifica se a sequência não está vazia
        first_tag = sequence[ 0 ][ 1 ]  # Pega a tag da primeira tupla (palavra, tag)
        initial_tag_counts[ first_tag ] += 1

# Inicializa o vetor pi com zeros
pi = np.zeros( n_states )
for tag, count in initial_tag_counts.items():
    if tag in tag_to_index:  # Garante que a tag está no nosso conjunto de estados
        pi[ tag_to_index[ tag ] ] = count

# Normalizar para obter probabilidades
if total_sequences > 0:
    pi = pi / total_sequences
else:
    # Caso não haja sequências, distribui uniformemente (ou define como zero)
    pi = np.ones( n_states ) / n_states

print( f"Vetor Pi (Prob. Iniciais) calculado. Shape: {pi.shape}" )

Vetor Pi (Prob. Iniciais) calculado. Shape: (16,)


## Cálculo da Matriz A de Transição

Este código constrói a matriz de probabilidades de transição \( A \), onde cada elemento \( A[i, j] \) representa a probabilidade de transitar da tag \( i \) para a tag \( j \).

- **Suavização de Laplace:**
  Inicializamos as contagens com um valor pequeno (`alpha_smooth = 0.1`) para evitar probabilidades zero em transições não vistas.

- **Contagem de Transições:**
  Para cada sequência de treinamento, o código incrementa as contagens de transição entre tags consecutivas e ajusta as contagens de origens.

- **Normalização:**
  As contagens de transições são normalizadas pela soma total de transições originadas em cada tag, resultando na matriz \( A \) com cada linha representando uma distribuição de probabilidade que soma 1.


In [6]:
# Matriz A[i, j] = P(tag_j | tag_i)
# Usaremos contagens e depois normalizaremos

# Adicionar suavização de Laplace (add-alpha smoothing) é recomendado
# para evitar probabilidades zero para transições não vistas.
alpha_smooth = 0.1  # Um pequeno valor de suavização

# Contagem de transições (tag_i -> tag_j)
# Inicializa com alpha para suavização
transition_counts = np.full( (n_states, n_states), alpha_smooth )

# Contagem total de vezes que cada tag_i origina uma transição
# Inicializa com alpha * n_states para consistência na normalização com suavização
tag_origin_counts = np.full( n_states, alpha_smooth * n_states )

for sequence in tagged_train_sequences:
    for i in range( len( sequence ) - 1 ):  # Itera até a penúltima tupla
        current_tag = sequence[ i ][ 1 ]
        next_tag = sequence[ i + 1 ][ 1 ]

        if current_tag in tag_to_index and next_tag in tag_to_index:
            idx_current = tag_to_index[ current_tag ]
            idx_next = tag_to_index[ next_tag ]

            transition_counts[ idx_current, idx_next ] += 1
            tag_origin_counts[ idx_current ] += 1  # Incrementa a contagem de origem

# Normalizar para obter a matriz A de probabilidades
# A[i, j] = count(tag_i -> tag_j) / count(tag_i como origem)
A = transition_counts / tag_origin_counts[ :, np.newaxis ]  # Divide cada linha pelo total de origem correspondente

# Verifica se as linhas somam 1 (ou muito próximo devido a float precision)
# print("Soma das linhas da Matriz A (deve ser próximo de 1):", np.sum(A, axis=1))
print( f"Matriz A (Transição) calculada. Shape: {A.shape}" )

Matriz A (Transição) calculada. Shape: (16, 16)


## Cálculo da Matriz B de Emissão

Este bloco constrói a matriz \(B\), onde cada elemento \(B[j, k]\) representa a probabilidade de uma tag \(j\) emitir uma palavra \(k\).

- **Suavização de Laplace:**
  Inicializa as contagens com um valor pequeno para lidar com emissões não observadas.

- **Contagem de Emissões:**
  Para cada par (palavra, tag) na sequência de treinamento, incrementa as contagens para a emissão da palavra pela tag correspondente.

- **Normalização:**
  Cada linha da matriz \(B\) é normalizada dividindo pelas contagens totais de cada tag, gerando distribuições de probabilidade para as emissões.

In [7]:
# Matriz B[j, k] = P(palavra_k | tag_j)
# Usaremos contagens e depois normalizaremos

# Contagem de emissões (tag_j -> palavra_k)
# Inicializa com alpha para suavização
emission_counts = np.full( (n_states, n_vocab), alpha_smooth )

# Contagem total de vezes que cada tag_j aparece
# Inicializa com alpha * n_vocab para consistência
tag_total_counts = np.full( n_states, alpha_smooth * n_vocab )

for sequence in tagged_train_sequences:
    for word, tag in sequence:
        if tag in tag_to_index and word in word_to_index:
            idx_tag = tag_to_index[ tag ]
            idx_word = word_to_index[ word ]

            emission_counts[ idx_tag, idx_word ] += 1
            tag_total_counts[ idx_tag ] += 1  # Incrementa a contagem total da tag

# Normalizar para obter a matriz B de probabilidades
# B[j, k] = count(tag_j emitindo palavra_k) / count(total de tag_j)
B = emission_counts / tag_total_counts[ :, np.newaxis ]  # Divide cada linha pelo total da tag correspondente

# Verifica se as linhas somam 1 (ou muito próximo)
# print("Soma das linhas da Matriz B (deve ser próximo de 1):", np.sum(B, axis=1))
print( f"Matriz B (Emissão) calculada. Shape: {B.shape}" )

Matriz B (Emissão) calculada. Shape: (16, 11162)


## Mapeamentos Inversos

Para podermos interpretar os resultados (converter índices de volta para palavras e tags).

In [8]:
# Assumindo que você já tem word_to_index e tag_to_index

index_to_word = { index: word for word, index in word_to_index.items() }
index_to_tag = { index: tag for tag, index in tag_to_index.items() }

print( "Mapeamentos inversos criados." )

Mapeamentos inversos criados.


## Algoritmo de Viterbi

Esta célula implementa o algoritmo de Viterbi para encontrar a sequência de tags mais provável para uma sequência de palavras, usando os parâmetros do HMM.

**Etapas:**
- **Inicialização:** Calcula as probabilidades para a primeira palavra, tratando palavras desconhecidas (OOV) com baixa probabilidade.
- **Recursão:** Atualiza as probabilidades acumuladas e mantém os "backpointers" para cada tag em cada posição.
- **Terminação e Backtracking:** Seleciona a última tag com a maior probabilidade e reconstrói a sequência ótima de tags.

O resultado é a lista de tags associadas à sequência de observações.


In [9]:
import numpy as np


def viterbi( obs_sequence: list[ str ],
             pi: np.ndarray,
             A: np.ndarray,
             B: np.ndarray,
             word_to_index: dict,
             tag_to_index: dict,
             index_to_tag: dict
             ) -> list[ str ]:
    """
    Implementa o Algoritmo de Viterbi para encontrar a sequência de estados (tags)
    mais provável para uma dada sequência de observações (palavras).

    Args:
        obs_sequence (list[str]): A sequência de palavras observadas.
        pi (np.ndarray): Vetor de probabilidades iniciais dos estados (shape: n_states).
        A (np.ndarray): Matriz de transição de estados (shape: n_states x n_states).
                           A[i, j] = P(estado_j | estado_i).
        B (np.ndarray): Matriz de emissão (shape: n_states x n_vocab).
                           B[j, k] = P(palavra_k | estado_j).
        word_to_index (dict): Mapeamento de palavra para índice no vocabulário.
        tag_to_index (dict): Mapeamento de tag (estado) para índice de estado.
        index_to_tag (dict): Mapeamento de índice de estado para tag.

    Returns:
        list[str]: A sequência de tags mais provável.
                   Retorna lista vazia se a sequência de observação for vazia.
    """
    n_states = A.shape[ 0 ]
    n_obs = len( obs_sequence )
    n_vocab = B.shape[ 1 ]  # Necessário para lidar com OOV

    if n_obs == 0:
        return [ ]

    # Adicionar um valor pequeno para evitar log(0) -> -inf
    epsilon = 1e-10
    log_pi = np.log( pi + epsilon )
    log_A = np.log( A + epsilon )
    log_B = np.log( B + epsilon )

    # Matriz T1 (viterbi_prob): Guarda a probabilidade máxima (em log) do caminho
    # que termina no estado j no tempo t. Shape: (n_states, n_obs)
    viterbi_prob = np.zeros( (n_states, n_obs) )

    # Matriz T2 (backpointer): Guarda o índice do estado anterior (no tempo t-1)
    # que levou à probabilidade máxima em T1[j, t]. Shape: (n_states, n_obs)
    backpointer = np.zeros( (n_states, n_obs), dtype = int )

    first_word = obs_sequence[ 0 ]
    if first_word in word_to_index:
        first_word_idx = word_to_index[ first_word ]
        # Probabilidade de emissão da primeira palavra para cada estado
        log_emission_prob_t0 = log_B[ :, first_word_idx ]
    else:
        # Palavra fora do vocabulário (OOV) - Atribui uma baixa prob. uniforme
        # print( f"Aviso: Palavra '{first_word}' não está no vocabulário de treino. Usando baixa probabilidade." )
        log_emission_prob_t0 = np.full( n_states, np.log( epsilon / n_states ) )  # Distribui epsilon

    # Probabilidade inicial = prob inicial do estado + prob de emitir a 1ª palavra
    viterbi_prob[ :, 0 ] = log_pi + log_emission_prob_t0
    backpointer[ :, 0 ] = 0  # Não há estado anterior

    for t in range( 1, n_obs ):
        word = obs_sequence[ t ]
        if word in word_to_index:
            word_idx = word_to_index[ word ]
            log_emission_prob_t = log_B[ :, word_idx ]
        else:
            # Lidar com OOV para palavras subsequentes
            # print( f"Aviso: Palavra '{word}' não está no vocabulário de treino. Usando baixa probabilidade." )
            log_emission_prob_t = np.full( n_states, np.log( epsilon / n_states ) )

        for j in range( n_states ):  # Para cada estado atual j no tempo t
            # Calcula a probabilidade de chegar ao estado j vindo de *cada* estado i no tempo t-1
            # prob = prob_max(t-1)_i + prob_trans(i -> j) + prob_emiss(palavra_t | estado_j)
            # Note que log_emission_prob_t é o mesmo para todos os 'i' ao calcular para o estado 'j'
            probs_via_prev_state = viterbi_prob[ :, t - 1 ] + log_A[ :, j ] + log_emission_prob_t[ j ]

            # Encontra a probabilidade máxima e o estado anterior que a gerou
            viterbi_prob[ j, t ] = np.max( probs_via_prev_state )
            backpointer[ j, t ] = np.argmax( probs_via_prev_state )

    # Encontra o índice do estado final com a maior probabilidade
    last_state_idx = np.argmax( viterbi_prob[ :, n_obs - 1 ] )
    # max_log_prob = viterbi_prob[last_state_idx, n_obs - 1] # Se precisar da prob. total

    # Reconstrói o caminho ótimo de trás para frente
    best_path_indices = [ 0 ] * n_obs
    best_path_indices[ n_obs - 1 ] = last_state_idx

    for t in range( n_obs - 2, -1, -1 ):  # Itera de n_obs-2 até 0
        # O estado no tempo t é dado pelo backpointer do estado escolhido no tempo t+1
        best_path_indices[ t ] = backpointer[ best_path_indices[ t + 1 ], t + 1 ]

    # Converte os índices do caminho de volta para os nomes das tags
    best_path_tags = [ index_to_tag[ idx ] for idx in best_path_indices ]

    return best_path_tags

## Avaliação do Modelo HMM no Conjunto de Teste

Agora que temos o modelo treinado (π, A, B) e o algoritmo de Viterbi para fazer predições, podemos avaliar o desempenho do nosso HMM de POS tagging.

Utilizaremos o conjunto de dados `tagged_test_sequences`, que não foi usado durante o treinamento das matrizes de probabilidade. Para cada sequência no conjunto de teste, faremos o seguinte:
1. Extrair a sequência de palavras (observações).
2. Extrair a sequência de tags reais (ground truth).
3. Usar a função `viterbi` para prever as tags com base nas palavras observadas.
4. Comparar as tags preditas com as tags reais e contar o número de acertos.

Ao final, calcularemos a acurácia geral do modelo, que é a proporção de tags corretamente identificadas em relação ao número total de tags no conjunto de teste.

In [10]:
# Inicializa contadores para a avaliação
total_tags_evaluated = 0
correct_tags_predicted = 0

print( f"Iniciando avaliação em {len( tagged_test_sequences )} sequências de teste..." )

# Itera sobre cada sequência no conjunto de teste
for i, sequence_tuples in enumerate( tagged_test_sequences ):
    if not sequence_tuples:  # Pula sequências vazias, se houver
        continue

    # Extrai as palavras (observações) e as tags reais
    obs_sequence = [ word for word, tag in sequence_tuples ]
    actual_tags = [ tag for word, tag in sequence_tuples ]

    # Usa o Viterbi para prever as tags
    predicted_tags = viterbi(
            obs_sequence = obs_sequence,
            pi = pi,
            A = A,
            B = B,
            word_to_index = word_to_index,
            tag_to_index = tag_to_index,
            index_to_tag = index_to_tag
    )

    # Verifica se o Viterbi retornou uma sequência com o mesmo comprimento
    if len( predicted_tags ) == len( actual_tags ):
        # Compara as tags preditas com as reais
        for predicted, actual in zip( predicted_tags, actual_tags ):
            if predicted == actual:
                correct_tags_predicted += 1
        total_tags_evaluated += len( actual_tags )  # Adiciona o número de tags nesta sequência ao total
    else:
        # Se os comprimentos diferirem, algo deu errado no Viterbi para esta sequência.
        # É bom registrar isso, mas para a acurácia, podemos apenas contar as tags reais no total
        # e não adicionar acertos.
        print( f"AVISO: Comprimento da sequência {i} difere! "
               f"Observações: {len( obs_sequence )}, Reais: {len( actual_tags )}, Preditas: {len( predicted_tags )}"
               )
        # Mesmo com erro, contabilizamos as tags que deveriam ter sido avaliadas.
        total_tags_evaluated += len( actual_tags )

print( "\nAvaliação concluída." )

# Calcula a acurácia final
if total_tags_evaluated > 0:
    accuracy = correct_tags_predicted / total_tags_evaluated
    print( "\n--- Resultados da Avaliação do Modelo HMM ---" )
    print( f"Total de Tags Avaliadas no Conjunto de Teste: {total_tags_evaluated}" )
    print( f"Total de Tags Preditas Corretamente: {correct_tags_predicted}" )
    print( f"Acurácia Geral do POS Tagging: {accuracy:.2%}" )  # Exibe como porcentagem
else:
    print( "\nNão foi possível calcular a acurácia (nenhuma tag avaliada). "
           "Verifique o conjunto de teste."
           )

Iniciando avaliação em 361 sequências de teste...

Avaliação concluída.

--- Resultados da Avaliação do Modelo HMM ---
Total de Tags Avaliadas no Conjunto de Teste: 56441
Total de Tags Preditas Corretamente: 49881
Acurácia Geral do POS Tagging: 88.38%
