# Persian-English Translator Project (Classical Seq2Seq)

This notebook implements a classical Persian↔English translator using Seq2Seq (LSTM). It includes preprocessing, POS tagging, NER, tokenization, embedding, model design, training, and evaluation.

Before training embeddings and classifiers, the raw shenasa/English-Persian-Parallel-Dataset was cleaned, normalized, and tokenized. This step is essential for Persian (Farsi) text, which often mixes scripts, punctuation styles, and contains noise.

---


## Table of contents
1. Project goals
2. Environment setup
3. Data collection
4. Preprocessing
5. POS tagging
6. NER
7. Tokenization & Embedding
8. Seq2Seq model
9. Evaluation
10. Analysis and report



## 2. Environment setup

In [None]:
!pip install --upgrade pip
!pip install numpy pandas matplotlib scikit-learn tensorflow keras hazm parsivar sacrebleu nltk sentencepiece
# Optional: for advanced Persian NER datasets and models
!pip install datasets transformers seqeval

In [None]:
#%load_ext cudf.pandas

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import sklearn
import tensorflow as tf
import keras

import hazm
import parsivar
import sacrebleu
import nltk
import sentencepiece
import datasets
import transformers
import seqeval

## 3. Data collection

### 3.1: Imports

In [None]:
from datasets import load_dataset, DatasetDict
import re, os
from pprint import pprint

### 3.2: Load dataset and inspect (normal load)

In [None]:
# Data collection: load the Hugging Face dataset (full, non-streaming)

DATASET_ID = "shenasa/English-Persian-Parallel-Dataset"

print("Loading dataset (this may take a minute)...")
data_set = load_dataset(DATASET_ID)   # loads all splits (here only 'train')
print("Dataset is loaded successfully")

In [None]:
# quick summary (splits, size)
print(data_set)

# Print column names and example
split_name = list(data_set.keys())[0]      # should be 'train'
print("Split:", split_name)
print("Columns:", data_set[split_name].column_names)
print("\nOne sample (0):")
pprint(data_set[split_name][0])

## 4. Preprocessing (Normalization & cleaning)

### 4.1: Imports

In [None]:
from datasets import Dataset
import re
from hazm import Normalizer , word_tokenize , stopwords_list
from datasets import DatasetDict
from nltk.corpus import stopwords
#import spacy

In [None]:
nltk.download('stopwords')

### 4.2: Select a part of dataset

In [None]:
# take a random sample of 500k
ds = DatasetDict({
    "train": data_set["train"].shuffle(seed=42).select(range(500_000))
})
print(ds)

### 4.3: Auto-detect which column is Persian / English & rename

In [None]:
# Detect Persian / English columns
def is_persian(s):
    if s is None:
        return False
    return bool(re.search(r'[\u0600-\u06FF]', s))

cols = ds[split_name].column_names
sample = ds[split_name][0]

persian_col = None
english_col = None
for c in cols:
    try:
        if is_persian(sample[c]):
            persian_col = c
        else:
            english_col = c
    except Exception:
        pass

print("Detected Persian column:", persian_col)
print("Detected English column:", english_col)


# Helper functions to detect unwanted text
def contains_english(text):
    if text is None:
        return False
    return bool(re.search(r"[A-Za-z]", text))

def contains_persian(text):
    if text is None:
        return False
    return bool(re.search(r"[\u0600-\u06FF]", text))


# Filter dataset
filtered_dataset = ds[split_name].filter(
    lambda x: not contains_english(x[persian_col]) and not contains_persian(x[english_col])
)

# Wrap back into DatasetDict to keep "train" key
ds = {"train": filtered_dataset}
print(ds)


- There is a problem here. if we delete this way, all persian sentences with a single charachter of english will be deleted.

In [None]:
ds

### 4.4: Rename columns to standard names and run light cleaning

In [None]:
def rename_and_select(example):
    return {"persian": example[persian_col], "english": example[english_col]}

print("Mapping and renaming columns...")
ds_simple = ds[split_name].map(rename_and_select, remove_columns=ds[split_name].column_names) # split_name="train"

print("Old columns:", ds[split_name].column_names)
print("New columns:", ds_simple.column_names)

In [None]:
print(ds)
print(ds_simple)

### 4.5: Cleaning helpers (Persian normalizer + English lite cleaning)

In [None]:
normalizer = Normalizer()

# Maps Arabic to Persian chars
AR_TO_FA = {'\u064A': '\u06CC', '\u0643': '\u06A9'}
# Zero-width and invisible characters
ZERO_WIDTH = ['\u200c', '\u200f', '\u202a', '\u202b']

PERSIAN_DIGITS = '۰۱۲۳۴۵۶۷۸۹'
ASCII_DIGITS = '0123456789'

# Persian + English stopwords
persian_stopwords = set(stopwords_list())
english_stopwords = set(stopwords.words("english"))

# --- Cleaning functions ---
def replace_arabic_chars(text):
    for a, f in AR_TO_FA.items():
        text = text.replace(a, f)
    return text

def remove_zero_width(text):
    for zw in ZERO_WIDTH:
        text = text.replace(zw, " ")
    return text

def normalize_digits(text):
    for fa, en in zip(PERSIAN_DIGITS, ASCII_DIGITS):
        text = text.replace(fa, en)
    return text

def clean_html(text):
    return re.sub(r"<.*?>", " ", text)

def clean_urls(text):
    return re.sub(r"http\S+|www\.\S+", " ", text)

def remove_unwanted_chars(text):
    # Keep Persian, English letters, and basic spaces
    return re.sub(r"[^آ-یA-Za-z0-9\s]", " ", text)

def remove_extra_spaces(text):
    return re.sub(r"\s+", " ", text).strip()


def remove_stopwords(text):
    tokens = word_tokenize(text)
    tokens = [t for t in tokens if t not in persian_stopwords and t.lower() not in english_stopwords]
    return " ".join(tokens)



In [None]:
def clean_persian(text):
    if text is None: return ""
    text = str(text)
    text = text.replace("٫", " ")
    text = text.replace(",", " ")
    text = clean_html(text)
    text = clean_urls(text)
    text = replace_arabic_chars(text)
    text = remove_zero_width(text)
    text = normalize_digits(text)
    text = normalizer.normalize(text)
    text = normalize_digits(text)
    text = remove_unwanted_chars(text)
    text = remove_extra_spaces(text)
    #text = remove_stopwords(text)
    return text

def clean_english(text):
    if text is None: return ""
    text = str(text).strip()
    text = text.replace("٫", " ")
    text = text.replace(",", " ")
    # optional: lowercasing (depends if you want to preserve casing)
    text = text.lower()
    text = re.sub(r'\s+', ' ', text)
    text = clean_html(text)
    text = clean_urls(text)
    text = remove_zero_width(text)
    text = normalizer.normalize(text)
    text = normalize_digits(text)
    text = remove_unwanted_chars(text)
    text = remove_extra_spaces(text)
    #text = remove_stopwords(text)
    return text

# Quick test
print(clean_persian("این٫  ۱۲۳ کتاب‌ هاست‎"))
print(clean_english(" This IS  ,a Test! "))
print(clean_persian(" کاتالوگ 4 5۵, ٫ مگابایت"))
print(clean_english(" کاتالوگ ۵۴ 7, ٫ مگابایت"))

### 4.6: Apply cleaning

In [None]:
# Apply cleaning to dataset (use num_proc if Colab CPU allows parallelism)
def clean_example(ex):
    return {
        "persian": clean_persian(ex["persian"]),
        "english": clean_english(ex["english"])
    }

print("Cleaning dataset (batched)...")
ds_clean = ds_simple.map(clean_example, batched=False)  # batched=True can be faster but needs function change
print("After cleaning sample:", ds_clean[0])

In [None]:
# Filter out empty or too-long sentences (token-length heuristics)
MAX_CHARS = 512
def filter_empty_or_long(ex):
    if ex["persian"].strip()=="" or ex["english"].strip()=="":
        return False
    if len(ex["persian"]) > MAX_CHARS or len(ex["english"]) > MAX_CHARS:
        return False
    return True

ds_clean = ds_clean.filter(filter_empty_or_long)
print("Size after filter:", len(ds_clean))

In [None]:
print("some examples:")

for i in range(5):
  print(f"\n {i}-raw:" ,(ds[split_name][i]))
  print(f"\n {i}-Pe_En:" ,(ds_simple[i]))
  print(f"\n {i}-clean:" ,(ds_clean[i]))
  print(f"\n")

### 4.6: Apply cleaning with removing stop-words

In [None]:
def clean_stopwords_persian(text):
    if text is None: return ""
    text = remove_stopwords(text)
    return text

def clean_stopwords_english(text):
    if text is None: return ""
    text = remove_stopwords(text)
    return text

# Quick test
print(clean_stopwords_persian(clean_persian("این٫  ۱۲۳ کتاب‌ هاست‎")))
print(clean_stopwords_english(clean_english(" This IS  ,a Test! ")))
print(clean_stopwords_persian(clean_persian(" کاتالوگ, ٫ مگابایت")))
print(clean_stopwords_english(clean_english(" کاتالوگ, ٫ مگابایت")))

In [None]:
# Apply stop-words cleaning to dataset (use num_proc if Colab CPU allows parallelism)
def clean_example(ex):
    return {
        "persian": clean_stopwords_persian(ex["persian"]),
        "english": clean_stopwords_english(ex["english"])
    }

print("Cleaning dataset ...")
ds_stopwords_clean = ds_clean.map(clean_example, batched=False)  # batched=True can be faster but needs function change
print("After stop-words cleaning sample:", ds_stopwords_clean[0])

In [None]:
print("some examples:")

for i in range(5):
  print(f"\n {i}-raw:" ,(ds[split_name][i]))
  #print(f"\n {i}-Pe_En:" ,(ds_simple[i]))
  print(f"\n {i}-clean:" ,(ds_clean[i]))
  print(f"\n {i}-clean_stopwords:" ,(ds_stopwords_clean[i]))
  print(f"\n")

### 4.8: Create train/validation/test splits

**4.8.1: ds_clean**

In [None]:
def split_dataset(ds_clean, seed: int = 42):

    total = len(ds_clean)
    print("Total pairs:", total)

    if total > 1000:
        # Large dataset: ~3% holdout, split into val/test (1.5% each)
        split1 = ds_clean.train_test_split(test_size=0.03, seed=seed, shuffle=True)
        hold = split1['test'].train_test_split(test_size=0.5, seed=seed)
        dataset_splits = DatasetDict({
            'train': split1['train'],
            'validation': hold['train'],
            'test': hold['test']
        })
    else:
        # Small dataset: 80/10/10
        split1 = ds_clean.train_test_split(test_size=0.2, seed=seed, shuffle=True)
        hold = split1['test'].train_test_split(test_size=0.5, seed=seed)
        dataset_splits = DatasetDict({
            'train': split1['train'],
            'validation': hold['train'],
            'test': hold['test']
        })

    # Print sizes
    for k in dataset_splits:
        print(f"{k}: {len(dataset_splits[k])}")

    # Peek at one example
    print("\nExample pair (train[10]):")
    print(dataset_splits['train'][10])

    return dataset_splits


In [None]:
dataset_splits = split_dataset(ds_clean)

In [None]:
# If you want to implement "Stop-words Removing"
#dataset_splits = split_dataset(ds_stopwords_clean)

### 4.8: Save to disk / optional Google Drive mount

In [None]:
# Option A: save to Colab local storage
os.makedirs("data", exist_ok=True)
dataset_splits['train'].to_csv("data/train.csv")
dataset_splits['validation'].to_csv("data/validation.csv")
dataset_splits['test'].to_csv("data/test.csv")
print("Saved CSVs to ./data/")

In [None]:
# Option B: save to Google Drive (uncomment to use)
'''
from google.colab import drive
drive.mount('/content/drive')
out_dir = "/content/drive/MyDrive/persian_translation_dataset"
os.makedirs(out_dir, exist_ok=True)
dataset_splits['train'].to_csv(os.path.join(out_dir,"train.csv"))
dataset_splits['validation'].to_csv(os.path.join(out_dir,"validation.csv"))
dataset_splits['test'].to_csv(os.path.join(out_dir,"test.csv"))
print("Saved CSVs to Google Drive:", out_dir)
'''

## 5. POS tagging (Hazm)

In [None]:
#test hazm pos_tagger model
tagger = POSTagger(model='pos_tagger.model')
tagger.tag(word_tokenize('ما بسیار کتاب می‌خوانیم'))
[('ما', 'PRO'), ('بسیار', 'ADV'), ('کتاب', 'N'), ('می‌خوانیم', 'V')]


In [None]:
from hazm import Normalizer, word_tokenize, POSTagger

# we should run this code for each train/test/validation datasets

# ==== config (edit as needed) ====
IN_CSV   = "data/train.csv"            # must contain a 'persian' column (and optionally 'english')
OUT_CSV  = "data/train_pos.csv"
MODEL    = "pos_tagger.model"         # e.g. "resources/postagger.model"
DELIM    = ","                        # change if your CSV uses another delimiter
NROWS    = None                       # set to an int (e.g., 10) to sample only first N rows
# =================================

# load data
df = pd.read_csv(IN_CSV, delimiter=DELIM, nrows=NROWS)

# hazm components
tagger = POSTagger(model=MODEL)
norm = Normalizer(persian_numbers=True)

def pos_tags_only(text: str) -> list[str]:
    """Return POS tag sequence (no tokens), one tag per token."""
    if not isinstance(text, str) or not text.strip():
        return []
    # normalize + tokenize with the SAME toolchain you'll use elsewhere
    toks = word_tokenize(norm.normalize(text))
    tagged = tagger.tag(toks)              # [(tok, POS), ...]
    return [pos for _, pos in tagged]      # keep only POS

def tokenize(text: str) -> list[str]:
    if not isinstance(text, str) or not text.strip():
        return []
    return word_tokenize(norm.normalize(text))

# build columns
df["persian_tok"] = df["persian"].apply(tokenize)
df["persian_pos"] = df["persian"].apply(lambda s: " ".join(pos_tags_only(s)))

# (optional) quick alignment check
def count_ws(s): return len(str(s).split())
align_ok = []
bad_rows = []
for i, (sent, tags_str) in enumerate(zip(df["persian"], df["persian_pos"])):
    n_tok = len(tokenize(sent))
    n_tag = count_ws(tags_str)
    ok = (n_tok == n_tag)
    align_ok.append(ok)
    if not ok and len(bad_rows) < 10:
        bad_rows.append((i, n_tok, n_tag, sent, tags_str))

df["pos_align_ok"] = align_ok

# save
cols_to_save = [c for c in ["persian", "english", "persian_pos"] if c in df.columns]
df[cols_to_save].to_csv(OUT_CSV, index=False)
print(f"Saved -> {OUT_CSV}")
print(f"Alignment OK: {sum(align_ok)}/{len(align_ok)} "
      f"({100*sum(align_ok)/max(1,len(align_ok)):.1f}%)")

if bad_rows:
    print("\nFirst few misaligned rows (index, n_tok, n_tag):")
    for i, n_tok, n_tag, sent, tags in bad_rows:
        print(f"- row {i}: tokens={n_tok}, tags={n_tag}")


now we have data with POS tagging

---


## 6. Named Entity Recognition

In [None]:
from transformers import pipeline

ner_fa = pipeline(
    "token-classification",
    model="HooshvareLab/bert-fa-base-uncased-ner-peyma",
    tokenizer="HooshvareLab/bert-fa-base-uncased-ner-peyma",
    aggregation_strategy="simple"
)


run this for each Train/Test/Validation dataset



In [None]:
from datasets import load_dataset
import json

# --- Load CSV (expects a column named "persian") ---
ds = load_dataset("csv", data_files="data/train_pos.csv", split="train")


In [None]:
# -*- coding: utf-8 -*-

import re
import json


# --- NER batch function: emits per-token tags like "0 0 0 LOC 0" ---
def run_ner(batch):
    texts = batch["persian"]                  # list[str]
    ents_list = ner_fa(texts)                 # list[list[dict]] ; aggregation_strategy="simple"

    def norm_label(lbl: str) -> str:
        if not lbl:
            return "0"
        s = str(lbl).strip()
        if s[:2] in ("B_", "I_", "B-", "I-"):
            s = s[2:]
        s = s.upper()
        alias = {
            "PER": "PERSON", "PERS": "PERSON", "PERSON": "PERSON",
            "ORG": "ORG", "ORGANIZATION": "ORG",
            "LOC": "LOCATION", "LOCATION": "LOCATION", "GPE": "LOCATION",
            "FAC": "FACILITY", "FACILITY": "FACILITY",
            "DAT": "DATE", "DATE": "DATE",
            "TIM": "TIME", "TIME": "TIME",
            "MON": "MONEY", "MONEY": "MONEY",
            "PCT": "PERCENT", "PERCENT": "PERCENT",
            "QUANTITY": "QUANTITY", "CARDINAL": "CARDINAL", "ORDINAL": "ORDINAL",
            "MISC": "MISC", "EVENT": "EVENT", "PRODUCT": "PRODUCT",
            "WORK_OF_ART": "WORK_OF_ART", "LAW": "LAW", "LANGUAGE": "LANGUAGE",
            "NORP": "NORP",
        }
        return alias.get(s, s)

    def tag_sequence(text: str, ents: list[dict]) -> str:
        """
        Tokenize by whitespace (runs of non-space chars) and align labels by char spans.
        Non-entity tokens get "0". If a token overlaps any entity span, it gets that entity's label.
        """
        # 1) whitespace tokens with char spans
        tokens = [(m.group(0), m.start(), m.end()) for m in re.finditer(r"\S+", text or "")]
        # 2) entity spans (start,end,label)
        spans = []
        for e in ents or []:
            lbl = e.get("entity_group") or e.get("entity") or ""
            start = e.get("start")
            end = e.get("end")
            if lbl and start is not None and end is not None:
                spans.append((int(start), int(end), norm_label(lbl)))
        # 3) assign label per token (simple overlap rule)
        tags = []
        for _, t0, t1 in tokens:
            tag = "0"
            for s0, s1, lab in spans:
                if t0 < s1 and t1 > s0:  # overlap
                    tag = lab
                    break
            tags.append(tag)
        return " ".join(tags)

    # Build outputs
    seqs = []
    for text, ents in zip(texts, ents_list):
        seqs.append(tag_sequence(text, ents))

    batch["fa_ner_seq"] = seqs   # e.g., "0 0 0 LOC 0"
    return batch


# ---- Map over dataset ----
# NOTE: assumes you already have `ds` and `ner_fa`.
ds_out = ds.map(run_ner, batched=True, batch_size=32)

# ---- Save with fa_ner_seq as the LAST column ----
# We’ll keep original column order and append the new column at the end.
orig_cols = ds.column_names
wanted_order = orig_cols + ["fa_ner_seq"]
wanted_order = [c for c in wanted_order if c in ds_out.column_names]  # robust if columns vary

# Option A (simple & explicit control via pandas)
df = ds_out.to_pandas()
df = df[wanted_order]
df.to_csv("data/train_pos_ner.csv", index=False)
print("Saved train_pos_ner.csv with 'fa_ner_seq' appended as the last column.")


At the end we have full dataset with preprocessing / POS /NER

---


## 7. Tokenization & embedding

## 8. Seq2Seq model

## 9. Evaluation

## End of Notebook
This Colab notebook covers preprocessing, POS tagging, NER, tokenization, Seq2Seq model, and evaluation.