In [None]:
import pandas as pd
import json
import re
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from collections import Counter
from typing import List, Dict, Tuple

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from torchcrf import CRF

In [None]:
KEYWORDS_FOR_STARTS_WITH = ["Province", "City", "Municipality", "Barangay", "Zone", "Street"]
ZV_PATTERN_REGEX = re.compile(
    r"\d+(?:ST|ND|RD|TH)\s+(?:REVISION|Rev)(?:.*Z\.?V\.?.*SQ.*M\.?)?|"
    r"(?:\d+(?:ST|ND|RD|TH)\s+REVISION|Rev\s+ZV\s+/?.*SQ\.?\s*M\.?)|"
    r"(?:Z|2)\.?V\.?.*SQ.*M\.?|FINAL",
    re.IGNORECASE
)

In [None]:
VOCAB_SIZE = 20000
EMBEDDING_DIM = 128
TEXT_SEQ_OUTPUT_LEN = 50
LSTM_HIDDEN_DIM = 128
BATCH_SIZE = 4
EPOCHS = 50
LEARNING_RATE = 1e-3
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PAD_TOKEN = "<pad>"
UNK_TOKEN = "<unk>"

# Feature Engineering

In [None]:
def is_numeric_str(s: str) -> bool:
    if not s: return False
    s = re.sub(r'[₱,]', '', s)
    try:
        float(s)
        return True
    except ValueError:
        return False


def extract_single_row_features(raw_cells_list: list[str]) -> dict:
    scalar_features_dict = {}
    non_whitespace_cells_texts = [str(cell_text) for cell_text in raw_cells_list if
                                  cell_text is not None and str(cell_text).strip() != ""]
    concatenated_text = " ".join(non_whitespace_cells_texts)
    
    scalar_features_dict['num_cells_in_row'] = float(len(raw_cells_list))
    first_non_null_idx = -1
    for i, cell_text in enumerate(raw_cells_list):
        if cell_text is not None and str(cell_text).strip() != "":
            first_non_null_idx = i
            break
    scalar_features_dict['first_non_null_column'] = float(first_non_null_idx)
    last_non_null_idx = -1
    for i in range(len(raw_cells_list) - 1, -1, -1):
        cell_text = raw_cells_list[i]
        if cell_text is not None and str(cell_text).strip() != "":
            last_non_null_idx = i
            break
    scalar_features_dict['last_non_null_column'] = float(last_non_null_idx)
    scalar_features_dict['num_non_empty_cells'] = float(len(non_whitespace_cells_texts))
    if scalar_features_dict['num_cells_in_row'] > 0:
        scalar_features_dict['ratio_non_empty_cells'] = scalar_features_dict['num_non_empty_cells'] / \
                                                        scalar_features_dict['num_cells_in_row']
    else:
        scalar_features_dict['ratio_non_empty_cells'] = 0.0
    numeric_cell_count = sum(1 for text in non_whitespace_cells_texts if is_numeric_str(text))
    scalar_features_dict['num_numeric_cells'] = float(numeric_cell_count)
    if scalar_features_dict['num_non_empty_cells'] > 0:
        scalar_features_dict['ratio_numeric_cells'] = numeric_cell_count / scalar_features_dict['num_non_empty_cells']
    else:
        scalar_features_dict['ratio_numeric_cells'] = 0.0
    scalar_features_dict['is_row_empty_or_whitespace_only'] = 1.0 if scalar_features_dict[
                                                                         'num_non_empty_cells'] == 0 else 0.0
    scalar_features_dict['is_all_caps'] = 1.0 if concatenated_text.isupper() else 0.0
    first_cell_raw = str(raw_cells_list[0]) if raw_cells_list and raw_cells_list[0] is not None else ""
    for keyword in KEYWORDS_FOR_STARTS_WITH:
        scalar_features_dict[f'starts_with_keyword_{keyword.lower().replace(" ", "_")}'] = \
            1.0 if first_cell_raw.lower().startswith(keyword.lower()) else 0.0
    raw_concatenated_for_zv = " ".join(str(cell) for cell in raw_cells_list if cell is not None)
    scalar_features_dict['contains_keyword_zv'] = 1.0 if bool(ZV_PATTERN_REGEX.search(raw_concatenated_for_zv)) else 0.0
    feature_order = [
                        'num_cells_in_row', 'first_non_null_column', 'last_non_null_column',
                        'num_non_empty_cells', 'ratio_non_empty_cells', 'num_numeric_cells',
                        'ratio_numeric_cells', 'is_row_empty_or_whitespace_only', 'contains_keyword_zv',
                        'is_all_caps'
                    ] + [f'starts_with_keyword_{kw.lower().replace(" ", "_")}' for kw in KEYWORDS_FOR_STARTS_WITH]
    numeric_features_list = [scalar_features_dict.get(fname, 0.0) for fname in feature_order]
    return {
        'concatenated_text_clean': concatenated_text,
        'numeric_features': np.array(numeric_features_list, dtype=np.float32)
    }

# Text Vectorization

TODO
- out of vocab
- save to memory make it persist for reuse

implement smth like this so we can tokenize important abbr like zv if needed
```
def tokenize(self, text):
    # Preserve important spreadsheet tokens like "Z.V."
    preserved_tokens = re.findall(r'[A-Z]\.?[A-Z]\.?(?:/[A-Z]\.?[A-Z]\.?)?', text)
    # Replace them temporarily
    for i, token in enumerate(preserved_tokens):
        text = text.replace(token, f"__PRESERVED_{i}__")
    
    # Standard tokenization
    tokens = text.lower().split()
    
    # Restore preserved tokens
    for i, token in enumerate(preserved_tokens):
        for j, t in enumerate(tokens):
            if t == f"__preserved_{i}__":
                tokens[j] = token
    
    return tokens
```

In [None]:
class TextVectorizer:
    def __init__(self, max_tokens=VOCAB_SIZE, output_sequence_length=TEXT_SEQ_OUTPUT_LEN):
        self.max_tokens = max_tokens
        self.output_sequence_length = output_sequence_length
        self.vocab = {}
        self.token_to_idx = {}
        self.idx_to_token = {}
        self.pad_token_id = 0
        self.unk_token_id = 1

    def fit_on_texts(self, texts: List[str]):
        word_counts = Counter()
        for text in texts:
            word_counts.update(text.lower().split())

        # Keep most common words, reserve for pad and unk
        common_words = [word for word, count in word_counts.most_common(self.max_tokens - 2)]
        self.token_to_idx = {PAD_TOKEN: self.pad_token_id, UNK_TOKEN: self.unk_token_id}
        for i, token in enumerate(common_words):
            self.token_to_idx[token] = i + 2  # Start after pad and unk
        self.idx_to_token = {idx: token for token, idx in self.token_to_idx.items()}
        self.vocab_size = len(self.token_to_idx)
        print(f"Vocabulary size: {self.vocab_size}")

    def texts_to_sequences(self, texts: List[str]) -> List[List[int]]:
        sequences = []
        for text in texts:
            tokens = text.lower().split()
            seq = [self.token_to_idx.get(token, self.unk_token_id) for token in tokens]
            # Pad or truncate individual row text sequence
            if len(seq) < self.output_sequence_length:
                seq.extend([self.pad_token_id] * (self.output_sequence_length - len(seq)))
            else:
                seq = seq[:self.output_sequence_length]
            sequences.append(seq)
        return sequences

### Load annotations

In [None]:
annotations_filename = "annotationsv3.csv"
annotations_row_data = 'raw_cells_json'

TODO:
- replace iterrows
```
feature_series = df_annotations['raw_cells_list'] \
    .apply(extract_single_row_features)
df_processed = pd.DataFrame(feature_series.tolist())
df_processed[['text','numerics']].apply(pd.Series)
```

In [None]:
df_annotations = pd.read_csv(annotations_filename)

df_annotations['raw_cells_list'] = df_annotations[annotations_row_data].apply(
    lambda x: json.loads(x) if pd.notna(x) else [])

all_row_data = []
for index, row in df_annotations.iterrows():
    features = extract_single_row_features(row['raw_cells_list'])
    all_row_data.append({
        'text': features['concatenated_text_clean'],
        'numerics': features['numeric_features'],
        'label': row['label'],
        'filename': row['filename'],
        'sheetname': row['sheetname']
    })
df_processed_rows = pd.DataFrame(all_row_data)

In [None]:
df_processed_rows.head()

## Group by sheet