In [27]:
import pandas as pd

In [28]:
df = pd.read_csv("persian_emails.csv")


In [29]:
df.head()

Unnamed: 0,text,label
0,موضوع: درخواست پشتیبانی برای بیش از حد داغ می‌...,customer_support
1,موضوع: شکایت از اسپیکر بلوتوثی قابل حمل\n\nبا ...,customer_support
2,موضوع: شکایت از اسپیکر بلوتوثی قابل حمل\n\nدرو...,customer_support
3,موضوع: شرایط خرید سرویس اینترنت\n\nدرود بی‌کرا...,sales_inquiry
4,موضوع: درخواست پشتیبانی برای اتصال بلوتوث قطع ...,customer_support


In [30]:
import re
import string
import pandas as pd
import unicodedata
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import numpy as np
import torch
from torch.utils.data import DataLoader, TensorDataset

# 0) Define your Persian stop‑words
stop_words = {
    'و', 'در', 'به', 'از', 'که', 'این', 'را', 'با', 'است', 'برای', 'آن', 'یک', 'خود',
    'تا', 'کرد', 'بر', 'هم', 'نیز', 'گفت', 'تواند', 'باشد', 'شد', 'اما', 'دارد',
    'باید', 'او', 'می', 'دهد', 'یا', 'همه', 'کنند', 'اگر', 'آنها', 'بود', 'وی',
    'کنید', 'کند', 'داده', 'بوده', 'دارند', 'شود', 'چون', 'جز', 'من', 'ما',
    'تو', 'شما', 'ایشان'
}

# 1) Drop rows with null text/label
df = df.dropna(subset=['text', 'label'])

# 2) Map labels → integers
label_map = {
    'customer_support': 0,
    'sales_inquiry':    1,
    'partnership':      2,
    'spam':             3
}
df['label'] = df['label'].map(label_map)

# 3) Clean & tokenize function
def preprocess_text(text):
    text = unicodedata.normalize('NFKC', text.lower())
    text = re.sub(r'[\d%s]+' % re.escape(string.punctuation), ' ', text)
    tokens = text.split()
    tokens = [t for t in tokens if t not in stop_words and len(t) > 1]
    return ' '.join(tokens)

df['text'] = df['text'].apply(preprocess_text)

# 4) Split features and target, then stratify
X = df['text']
y = df['label']
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    random_state=42,
    stratify=y
)

# 5) Fit tokenizer on train only
tokenizer = Tokenizer(num_words=5000, oov_token='<OOV>')
tokenizer.fit_on_texts(X_train)

# 6) Convert to sequences and determine max length
train_seqs = tokenizer.texts_to_sequences(X_train)
max_len = min(200, int(np.percentile([len(s) for s in train_seqs], 95)))
train_padded = pad_sequences(train_seqs, maxlen=max_len, padding='post')

test_seqs = tokenizer.texts_to_sequences(X_test)
test_padded = pad_sequences(test_seqs, maxlen=max_len, padding='post')

# 7) Build DataLoaders
def to_loader(X_arr, y_arr, bs=32, shuffle=False):
    Xt = torch.tensor(X_arr, dtype=torch.long)
    yt = torch.tensor(y_arr.values, dtype=torch.long)
    ds = TensorDataset(Xt, yt)
    return DataLoader(ds, batch_size=bs, shuffle=shuffle)

train_loader = to_loader(train_padded, y_train, bs=32, shuffle=True)
test_loader  = to_loader(test_padded,  y_test,  bs=32)

print("Preprocessing done. DataLoaders are ready—no label leakage.")

Preprocessing done. DataLoaders are ready—no label leakage.


In [31]:
train_padded.shape

(1600, 36)

In [32]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import logging
from tqdm import tqdm

# Define the CNN model
class TextCNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_filters, filter_sizes, num_classes, dropout=0.5):
        super(TextCNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.convs = nn.ModuleList([
            nn.Conv1d(in_channels=embedding_dim, out_channels=num_filters, kernel_size=fs)
            for fs in filter_sizes
        ])
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(len(filter_sizes) * num_filters, num_classes)
        
    def forward(self, x):
        x = self.embedding(x)  # (batch_size, seq_len, embedding_dim)
        x = x.transpose(1, 2)  # (batch_size, embedding_dim, seq_len)
        x = [F.relu(conv(x)) for conv in self.convs]  # [(batch_size, num_filters, seq_len - fs + 1), ...]
        x = [F.max_pool1d(conv_out, conv_out.size(2)).squeeze(2) for conv_out in x]  # [(batch_size, num_filters), ...]
        x = torch.cat(x, 1)  # (batch_size, num_filters * len(filter_sizes))
        x = self.dropout(x)
        logits = self.fc(x)  # (batch_size, num_classes)
        return logits

# Set up logging
logging.basicConfig(filename='training.log', level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger()
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)

# Training function
def train_model(model, train_loader, test_loader, num_epochs, learning_rate, device):
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        avg_loss = total_loss / len(train_loader)
        logger.info(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")
        
        # Evaluate on test set
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        accuracy = correct / total
        logger.info(f"Test Accuracy: {accuracy:.4f}")
    
    return model

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hyperparameters
vocab_size = 5001  # Adjust based on tokenizer
embedding_dim = 100
num_filters = 128
filter_sizes = [3, 4, 5]
num_classes = 4  # Adjust based on your dataset
dropout = 0.6
num_epochs = 10
learning_rate = 0.001

# Initialize and train the model
model = TextCNN(vocab_size, embedding_dim, num_filters, filter_sizes, num_classes, dropout).to(device)
trained_model = train_model(model, train_loader, test_loader, num_epochs, learning_rate, device)

# Evaluate the final model on test set
trained_model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = trained_model(inputs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
cnn_accuracy = correct / total
print(f"CNN Test Accuracy: {cnn_accuracy:.4f}")

Epoch 1/10: 100%|██████████| 50/50 [00:00<00:00, 152.04it/s]
Epoch 1/10, Loss: 0.6428
Epoch 1/10, Loss: 0.6428
Epoch 1/10, Loss: 0.6428
Test Accuracy: 1.0000
Test Accuracy: 1.0000
Test Accuracy: 1.0000
Epoch 2/10: 100%|██████████| 50/50 [00:00<00:00, 170.12it/s]
Epoch 2/10, Loss: 0.0622
Epoch 2/10, Loss: 0.0622
Epoch 2/10, Loss: 0.0622
Test Accuracy: 1.0000
Test Accuracy: 1.0000
Test Accuracy: 1.0000
Epoch 3/10: 100%|██████████| 50/50 [00:00<00:00, 180.31it/s]
Epoch 3/10, Loss: 0.0250
Epoch 3/10, Loss: 0.0250
Epoch 3/10, Loss: 0.0250
Test Accuracy: 1.0000
Test Accuracy: 1.0000
Test Accuracy: 1.0000
Epoch 4/10: 100%|██████████| 50/50 [00:00<00:00, 168.94it/s]
Epoch 4/10, Loss: 0.0157
Epoch 4/10, Loss: 0.0157
Epoch 4/10, Loss: 0.0157
Test Accuracy: 1.0000
Test Accuracy: 1.0000
Test Accuracy: 1.0000
Epoch 5/10: 100%|██████████| 50/50 [00:00<00:00, 171.02it/s]
Epoch 5/10, Loss: 0.0102
Epoch 5/10, Loss: 0.0102
Epoch 5/10, Loss: 0.0102
Test Accuracy: 1.0000
Test Accuracy: 1.0000
Test Accurac

CNN Test Accuracy: 1.0000


In [33]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score

# Vectorize the text
vectorizer = TfidfVectorizer(max_features=5000)  # To match the CNN's vocab size
X_train = vectorizer.fit_transform(train_df['text'])
X_test = vectorizer.transform(test_df['text'])
y_train = train_df['label']
y_test = test_df['label']

# Train Logistic Regression
lr_model = LogisticRegression(max_iter=1000)
lr_model.fit(X_train, y_train)
lr_pred = lr_model.predict(X_test)
lr_accuracy = accuracy_score(y_test, lr_pred)
print(f"Logistic Regression Test Accuracy: {lr_accuracy:.4f}")

# Train SVM
svm_model = SVC()
svm_model.fit(X_train, y_train)
svm_pred = svm_model.predict(X_test)
svm_accuracy = accuracy_score(y_test, svm_pred)
print(f"SVM Test Accuracy: {svm_accuracy:.4f}")

# Compare accuracies (assuming cnn_accuracy is available from the previous code block)
print("\nAccuracy Comparison:")
print(f"CNN: {cnn_accuracy:.4f}")
print(f"Logistic Regression: {lr_accuracy:.4f}")
print(f"SVM: {svm_accuracy:.4f}")

Logistic Regression Test Accuracy: 1.0000
SVM Test Accuracy: 1.0000

Accuracy Comparison:
CNN: 1.0000
Logistic Regression: 1.0000
SVM: 1.0000


In [34]:
import re
import string
import unicodedata
import torch
import torch.nn.functional as F
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Invert your label_map for human‑readable output
label_map_inv = {0: 'customer_support',
                 1: 'sales_inquiry',
                 2: 'partnership',
                 3: 'spam'}

def predict_text(text: str,
                 model: torch.nn.Module,
                 tokenizer,
                 max_len: int,
                 device: torch.device) -> (str, torch.Tensor):
    """
    Preprocesses a raw Persian string, tokenizes, pads, runs through the trained TextCNN,
    and returns (predicted_label, softmax_probs).
    """
    # 1) Clean & tokenize (must match your training pipeline)
    text = unicodedata.normalize('NFKC', text.lower())
    text = re.sub(r'[\d%s]+' % re.escape(string.punctuation), ' ', text)
    tokens = text.split()
    tokens = [t for t in tokens if t not in stop_words and len(t) > 1]
    cleaned = ' '.join(tokens)

    # 2) Sequence & pad
    seq = tokenizer.texts_to_sequences([cleaned])
    padded = pad_sequences(seq, maxlen=max_len, padding='post')

    # 3) To tensor
    inp = torch.tensor(padded, dtype=torch.long).to(device)

    # 4) Forward
    model.eval()
    with torch.no_grad():
        logits = model(inp)                   # (1, num_classes)
        probs  = F.softmax(logits, dim=1).squeeze(0)  # (num_classes,)

    # 5) Decode
    idx = torch.argmax(probs).item()
    label = label_map_inv[idx]
    return label, probs.cpu()

# Example usage:
label, probs = predict_text("سلام درخواست همکاری در حوزه دارم با سپاش", trained_model, tokenizer, max_len, device)
print(label, probs)

partnership tensor([0.0057, 0.1671, 0.8208, 0.0064])


In [36]:
import torch
import pickle

# This script saves the trained TextCNN model and associated tokenizer and config.
# Run this after training completes and the following variables are available in your namespace:
# `trained_model`, `tokenizer`, `max_len`, `label_map_inv`, `stop_words`.

# Save model state_dict
torch.save(trained_model.state_dict(), 'textcnn.pth')

# Save tokenizer
with open('tokenizer.pkl', 'wb') as f:
    pickle.dump(tokenizer, f)

# Save configuration (preprocessing & model architecture)
model_config = {
    'vocab_size': trained_model.embedding.num_embeddings,
    'embedding_dim': trained_model.embedding.embedding_dim,
    'num_filters': trained_model.convs[0].out_channels,
    'filter_sizes': [conv.kernel_size[0] for conv in trained_model.convs],
    'num_classes': trained_model.fc.out_features,
    'dropout': trained_model.dropout.p
}
config = {
    'max_len': max_len,
    'label_map_inv': label_map_inv,
    'stop_words': stop_words,
    'model_config': model_config
}
with open('config.pkl', 'wb') as f:
    pickle.dump(config, f)

print('Export complete: textcnn.pth, tokenizer.pkl, config.pkl')


Export complete: textcnn.pth, tokenizer.pkl, config.pkl
