In [None]:
# !pip install torch torchtext numpy

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import re
import string
import numpy as np
import requests
import gzip
import os

In [3]:
class CBOWDataset(Dataset):
    def __init__(self, texts, word_to_idx, window_size=2):
        self.data = []
        self.word_to_idx = word_to_idx
        self.window_size = window_size
        
        for text in texts:
            tokens = self._preprocess_text(text)
            for i in range(len(tokens)):
                target_word = tokens[i]
                if target_word in word_to_idx:
                    context = []
                    for j in range(max(0, i - window_size), min(len(tokens), i + window_size + 1)):
                        if j != i and tokens[j] in word_to_idx:
                            context.append(word_to_idx[tokens[j]])
                    
                    if len(context) >= 2:
                        while len(context) < 4:
                            context.append(0)
                        self.data.append((context[:4], word_to_idx[target_word]))
    
    def _preprocess_text(self, text):
        text = text.lower()
        text = re.sub(r'<.*?>', '', text)
        text = re.sub(r'[^\w\s]', '', text)
        tokens = text.split()
        return [token for token in tokens if len(token) > 1]
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        context, target = self.data[idx]
        context_tensor = torch.tensor(context, dtype=torch.long)
        return context_tensor, torch.tensor(target, dtype=torch.long)


In [4]:
class CBOWModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(CBOWModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.linear1 = nn.Linear(embedding_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, vocab_size)
        self.dropout = nn.Dropout(0.3)
        
    def forward(self, context):
        embeds = self.embedding(context)
        context_vector = torch.mean(embeds, dim=1)
        hidden = F.relu(self.linear1(context_vector))
        hidden = self.dropout(hidden)
        output = self.linear2(hidden)
        return output

In [None]:
def download_imdb_data():
    url = "https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"
    
    try:
        print("Downloading IMDB dataset...")
        response = requests.get(url, stream=True)
        
        if response.status_code == 200:
            with open("aclImdb_v1.tar.gz", "wb") as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
            
            import tarfile
            with tarfile.open("aclImdb_v1.tar.gz", "r:gz") as tar:
                tar.extractall()
            
            texts = []
            import glob
            
            pos_files = glob.glob("aclImdb/train/pos/*.txt")
            neg_files = glob.glob("aclImdb/train/neg/*.txt")
            
            for file_path in pos_files + neg_files:
                with open(file_path, 'r', encoding='utf-8') as f:
                    texts.append(f.read())
            
            print(f"Loaded {len(texts)} IMDB reviews")
            return texts
    except Exception as e:
        print(f"Could not download IMDB data: {e}")
        
imdb_texts = download_imdb_data()

In [None]:
def preprocess_text(text):
    text = text.lower()
    text = re.sub(r'<.*?>', '', text)
    text = re.sub(r'[^\w\s]', '', text)
    tokens = text.split()
    return [token for token in tokens if len(token) > 1]

word_counts = Counter()
for text in imdb_texts:
    tokens = preprocess_text(text)
    word_counts.update(tokens)

min_freq = 3
vocab_words = [word for word, count in word_counts.items() if count >= min_freq]

word_to_idx = {'<PAD>': 0}
idx_to_word = {0: '<PAD>'}

for i, word in enumerate(vocab_words, 1):
    word_to_idx[word] = i
    idx_to_word[i] = word

vocab = set(vocab_words)
vocab_size = len(word_to_idx)

print(f"Vocabulary size: {vocab_size}")
print(f"Sample words: {list(vocab)[:10]}")

In [None]:
device = torch.device('cuda')
device

In [None]:
embedding_dim = 128
hidden_dim = 256
window_size = 5

model = CBOWModel(vocab_size, embedding_dim, hidden_dim).to(device)
print("Model initialized")
print(f"Model parameters: {sum(p.numel() for p in model.parameters())}")

In [None]:
dataset = CBOWDataset(imdb_texts, word_to_idx, window_size)
batch_size = 128
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

print(f"Dataset created with {len(dataset)} training samples")
print(f"Number of batches: {len(dataloader)}")


In [None]:
epochs = 10
learning_rate = 0.001

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

model.train()
print("Starting training...")

for epoch in range(epochs):
    total_loss = 0
    num_batches = 0
    
    for batch_idx, (context, target) in enumerate(dataloader):
        optimizer.zero_grad()
        context = context.to(device)
        target = target.to(device)
        output = model(context)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        num_batches += 1
        
        if batch_idx % 1000 == 0:
            print(f'Epoch {epoch + 1}/{epochs}, Batch {batch_idx}, Loss: {loss.item():.4f}')
    
    avg_loss = total_loss / num_batches
    print(f'Epoch {epoch + 1}/{epochs} completed, Average Loss: {avg_loss:.4f}')

print("Training completed!")


In [None]:
import re
from collections import defaultdict
from typing import List, Set, Dict, Tuple

def tokenize(text: str) -> List[str]:
    text = text.lower()
    text = re.sub(r"[^a-z0-9']+", " ", text)
    return [t for t in text.split() if t]

def kgrams(term: str, k: int = 2, use_boundaries: bool = False) -> Set[str]:
    t = ('^' + term + '$') if use_boundaries else term
    if len(t) < k:
        return {t}
    return {t[i:i+k] for i in range(len(t)-k+1)}

def build_kgram_index_from_docs(documents: List[str], k: int = 2, use_boundaries: bool = False) -> Tuple[Dict[str, Set[str]], Set[str]]:
    vocab = set()
    for doc in documents:
        vocab.update(tokenize(doc))
    index = defaultdict(set)
    for term in vocab:
        for kg in kgrams(term, k, use_boundaries):
            index[kg].add(term)
    return dict(index), vocab

def generate_candidates_from_index(query_word: str, index: Dict[str, Set[str]], k: int = 2, use_boundaries: bool = False, max_candidates: int = 1000) -> Set[str]:
    q_k = kgrams(query_word, k, use_boundaries)
    candidates = set()
    for kg in q_k:
        candidates.update(index.get(kg, ()))
        if len(candidates) >= max_candidates:
            break
    candidates.discard(query_word)
    return candidates

In [25]:
def generate_candidates(word, vocab):
    def edits1(word):
        letters = string.ascii_lowercase
        splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]
        deletes = [L + R[1:] for L, R in splits if R]
        transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1]
        replaces = [L + c + R[1:] for L, R in splits if R for c in letters]
        inserts = [L + c + R for L, R in splits for c in letters]
        return set(deletes + transposes + replaces + inserts)
    
    def known(words):
        return set(w for w in words if w in vocab)
    
    candidates = known([word])
    if not candidates:
        candidates = known(edits1(word))
    if not candidates:
        edits2 = set(e2 for e1 in edits1(word) for e2 in edits1(e1))
        candidates = known(edits2)
    if not candidates:
        candidates = {word}
    
    return list(candidates)

def get_context_score(model, word_to_idx, context_words, candidate):
    if candidate not in word_to_idx:
        return float('-inf')
    
    context_indices = [word_to_idx.get(word, 0) for word in context_words if word in word_to_idx]
    
    if len(context_indices) == 0:
        return 0.0
    
    while len(context_indices) < 4:
        context_indices.append(0)
    
    context_tensor = torch.tensor([context_indices[:4]], dtype=torch.long)
    
    model.eval()
    with torch.no_grad():
        context_tensor = context_tensor.to(device)
        output = model(context_tensor)
        probabilities = F.softmax(output, dim=1)
        probabilities = probabilities.to('cpu')
        candidate_idx = word_to_idx[candidate]
        score = probabilities[0, candidate_idx].item()
    
    return score

In [26]:
def correct_spelling(sentence, model, word_to_idx, vocab, window_size=2):
    words = preprocess_text(sentence)
    corrected_words = []
    
    for i, word in enumerate(words):
        if word in vocab:
            corrected_words.append(word)
        else:
            candidates = generate_candidates(word, vocab)
            
            context_words = []
            start_idx = max(0, i - window_size)
            end_idx = min(len(words), i + window_size + 1)
            
            for j in range(start_idx, end_idx):
                if j != i and j < len(words):
                    context_words.append(words[j])
            
            best_candidate = word
            best_score = float('-inf')
            
            for candidate in candidates:
                score = get_context_score(model, word_to_idx, context_words, candidate)
                if score > best_score:
                    best_score = score
                    best_candidate = candidate
            
            corrected_words.append(best_candidate)
    
    return ' '.join(corrected_words)

In [28]:
def interactive_correction(sentence):
    corrected = correct_spelling(sentence, model, word_to_idx, vocab, window_size)
    print(f"Original:  {sentence}")
    print(f"Corrected: {corrected}")
    return corrected


In [None]:
custom_sentence = "ahmdabad form delh"
result = interactive_correction(custom_sentence)